Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): support more metadata columns for MySQL, PG and MongoDB #17051

Merged
merged 15 commits into from
Jun 5, 2024
27 changes: 25 additions & 2 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ create table rw.products_test ( id INT,
name STRING,
description STRING,
PRIMARY KEY (id)
) include timestamp as commit_ts from mysql_mytest table 'mytest.products';
) include timestamp as commit_ts
include database_name as database_name
include table_name as table_name
from mysql_mytest table 'mytest.products';

# sleep to ensure (default,'Milk','Milk is a white liquid food') is consumed from Debezium message instead of backfill.
sleep 10s
Expand Down Expand Up @@ -153,6 +156,14 @@ SELECT id,name,description FROM rw.products_test order by id limit 3
102 car battery 12V car battery
103 12-pack drill bits 12-pack of drill bits with sizes ranging from #40 to #3

query TT
select database_name, table_name from rw.products_test limit 3;
----
mytest products
mytest products
mytest products


# commit_ts of historical records should be '1970-01-01 00:00:00+00:00'
query I
SELECT count(*) as cnt from rw.products_test where commit_ts = '1970-01-01 00:00:00+00:00'
Expand Down Expand Up @@ -247,7 +258,11 @@ CREATE TABLE person_new (
credit_card varchar,
city varchar,
PRIMARY KEY (id)
) INCLUDE TIMESTAMP AS commit_ts FROM pg_source TABLE 'public.person';
) INCLUDE TIMESTAMP AS commit_ts
INCLUDE DATABASE_NAME as database_name
INCLUDE SCHEMA_NAME as schema_name
INCLUDE TABLE_NAME as table_name
FROM pg_source TABLE 'public.person';

statement ok
CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new;
Expand Down Expand Up @@ -276,6 +291,14 @@ SELECT * from person_new_cnt
----
6

query TTT
SELECT database_name,schema_name,table_name from person_new limit 3;
----
cdc_test public person
cdc_test public person
cdc_test public person


query ITTTT
SELECT id,name,email_address,credit_card,city from person_new order by id;
----
Expand Down
13 changes: 12 additions & 1 deletion e2e_test/source/cdc/mongodb/mongodb_basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
control substitution on

statement ok
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) INCLUDE TIMESTAMP as commit_ts WITH (
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB)
INCLUDE TIMESTAMP as commit_ts
INCLUDE DATABASE_NAME as database_name
INCLUDE COLLECTION_NAME as collection_name
WITH (
connector = 'mongodb-cdc',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'random_data.*'
Expand Down Expand Up @@ -30,5 +34,12 @@ select count(*) from users where commit_ts = '1970-01-01 00:00:00+00:00';
----
55

query TT
select database_name, collection_name FROM users LIMIT 2;
----
random_data users
random_data users


statement ok
DROP TABLE users cascade
13 changes: 13 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,15 @@ message AdditionalColumnHeader {
data.DataType data_type = 2;
}

// metadata column for cdc table
message AdditionalDatabaseName {}

message AdditionalSchemaName {}

message AdditionalTableName {}

message AdditionalCollectionName {}

// this type means we read all headers as a whole
message AdditionalColumnHeaders {}

Expand All @@ -215,6 +224,10 @@ message AdditionalColumn {
AdditionalColumnHeader header_inner = 5;
AdditionalColumnFilename filename = 6;
AdditionalColumnHeaders headers = 7;
AdditionalDatabaseName database_name = 8;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since they're all empty messages, can we use the same message type instead? Or directly use google.protobuf.Empty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, prost has some trouble with google well-known types.
& I use message structs for future extensibility on the column def, such as

message AdditionalColumnHeader {
  string inner_field = 1;
  data.DataType data_type = 2;
}

we won't want to handle the changes inside the catalog in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message AdditionalColumn {
  oneof column_type {
    AdditionalColumnKey key = 1;
    AdditionalColumnTimestamp timestamp = 2;
    AdditionalColumnPartition partition = 3;
    AdditionalColumnOffset offset = 4;
    AdditionalColumnHeader header_inner = 5;
    AdditionalColumnFilename filename = 6;
    AdditionalColumnHeaders headers = 7;
  }
}

I think we need different message type for the oneof.

AdditionalSchemaName schema_name = 9;
AdditionalTableName table_name = 10;
AdditionalCollectionName collection_name = 11;
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ async fn test_cdc_backfill() -> StreamResult<()> {
MySqlOffset::new(binlog_file.clone(), 10),
];

let table_name = SchemaTableName::new("mock_table".to_string(), "public".to_string());
let table_name = SchemaTableName {
schema_name: "public".to_string(),
table_name: "mock_table".to_string(),
};
let table_schema = Schema::new(vec![
Field::with_name(DataType::Int64, "id"), // primary key
Field::with_name(DataType::Float64, "price"),
Expand All @@ -179,6 +182,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
let external_table = ExternalStorageTable::new(
TableId::new(1234),
table_name,
"mydb".to_string(),
ExternalTableReaderImpl::Mock(MockExternalTableReader::new(binlog_watermarks)),
table_schema.clone(),
table_pk_order_types,
Expand Down
73 changes: 68 additions & 5 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType as PbDataType;
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use risingwave_pb::plan_common::{
AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader, AdditionalColumnHeaders,
AdditionalColumnKey, AdditionalColumnOffset, AdditionalColumnPartition,
AdditionalColumnTimestamp,
AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
AdditionalColumnPartition, AdditionalColumnTimestamp, AdditionalDatabaseName,
AdditionalSchemaName, AdditionalTableName,
};

use crate::error::ConnectorResult;
Expand Down Expand Up @@ -59,14 +60,27 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
// mongodb-cdc doesn't support cdc backfill table
(
MONGODB_CDC_CONNECTOR,
HashSet::from(["timestamp", "partition", "offset"]),
HashSet::from([
"timestamp",
"partition",
"offset",
"database_name",
"collection_name",
]),
),
])
});

// For CDC backfill table, the additional columns are added to the schema of `StreamCdcScan`
pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'static str>>> =
LazyLock::new(|| Some(HashSet::from(["timestamp"])));
LazyLock::new(|| {
Some(HashSet::from([
"timestamp",
"database_name",
"schema_name",
"table_name",
]))
});

pub fn get_supported_additional_columns(
connector_name: &str,
Expand Down Expand Up @@ -201,6 +215,55 @@ pub fn build_additional_column_catalog(
is_hidden: false,
},
"header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type),
"database_name" => ColumnCatalog {
column_desc: ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::DatabaseName(
AdditionalDatabaseName {},
)),
},
),
is_hidden: false,
},
"schema_name" => ColumnCatalog {
column_desc: ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})),
},
),
is_hidden: false,
},

"table_name" => ColumnCatalog {
column_desc: ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})),
},
),
is_hidden: false,
},
"collection_name" => ColumnCatalog {
column_desc: ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::CollectionName(
AdditionalCollectionName {},
)),
},
),
is_hidden: false,
},
_ => unreachable!(),
};

Expand Down
29 changes: 28 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::maxwell::MaxwellParser;
use crate::parser::simd_json_parser::DebeziumMongoJsonAccessBuilder;
use crate::parser::util::{
extract_header_inner_from_meta, extract_headers_from_meta, extreact_timestamp_from_meta,
extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta,
extreact_timestamp_from_meta,
};
use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
Expand Down Expand Up @@ -401,12 +402,38 @@ impl SourceStreamChunkRowWriter<'_> {
.unwrap(), // handled all match cases in internal match, unwrap is safe
));
}

(
_, // for cdc tables
&Some(ref col @ AdditionalColumnType::DatabaseName(_))
| &Some(ref col @ AdditionalColumnType::TableName(_)),
) => {
match self.row_meta {
Some(row_meta) => {
if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.meta {
Ok(A::output_for(
extract_cdc_meta_column(cdc_meta, col, desc.name.as_str())?
.unwrap_or(None),
))
} else {
Err(AccessError::Uncategorized {
message: "CDC metadata not found in the message".to_string(),
})
}
}
None => parse_field(desc), // parse from payload
}
}
(_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
Some(row_meta) => Ok(A::output_for(
extreact_timestamp_from_meta(row_meta.meta).unwrap_or(None),
)),
None => parse_field(desc), // parse from payload
},
(_, &Some(AdditionalColumnType::CollectionName(_))) => {
// collection name for `mongodb-cdc` should be parsed from the message payload
parse_field(desc)
}
(_, &Some(AdditionalColumnType::Partition(_))) => {
// the meta info does not involve spec connector
return Ok(A::output_for(
Expand Down
36 changes: 16 additions & 20 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,11 @@ mod tests {
if i == 0 {
// put begin message at first
source_msg_batch.push(SourceMessage {
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta {
full_table_name: "orders".to_string(),
source_ts_ms: 0,
is_transaction_meta: transactional,
}),
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
"orders".to_string(),
0,
transactional,
)),
split_id: SplitId::from("1001"),
offset: "0".into(),
key: None,
Expand All @@ -292,11 +292,11 @@ mod tests {
// put data messages
for data_msg in batch {
source_msg_batch.push(SourceMessage {
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta {
full_table_name: "orders".to_string(),
source_ts_ms: 0,
is_transaction_meta: false,
}),
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
"orders".to_string(),
0,
false,
)),
split_id: SplitId::from("1001"),
offset: "0".into(),
key: None,
Expand All @@ -306,11 +306,11 @@ mod tests {
if i == data_batches.len() - 1 {
// put commit message at last
source_msg_batch.push(SourceMessage {
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta {
full_table_name: "orders".to_string(),
source_ts_ms: 0,
is_transaction_meta: transactional,
}),
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
"orders".to_string(),
0,
transactional,
)),
split_id: SplitId::from("1001"),
offset: "0".into(),
key: None,
Expand Down Expand Up @@ -355,11 +355,7 @@ mod tests {
let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;

let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta {
full_table_name: "orders".to_string(),
source_ts_ms: 0,
is_transaction_meta: true,
});
let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new("orders".to_string(), 0, true));
let msg_meta = MessageMeta {
meta: &cdc_meta,
split_id: "1001",
Expand Down
30 changes: 28 additions & 2 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,14 @@ pub struct DebeziumChangeEvent<A> {

const BEFORE: &str = "before";
const AFTER: &str = "after";

const SOURCE: &str = "source";
const SOURCE_TS_MS: &str = "ts_ms";
const SOURCE_DB: &str = "db";
const SOURCE_SCHEMA: &str = "schema";
const SOURCE_TABLE: &str = "table";
const SOURCE_COLLECTION: &str = "collection";

const OP: &str = "op";
pub const TRANSACTION_STATUS: &str = "status";
pub const TRANSACTION_ID: &str = "id";
Expand Down Expand Up @@ -188,8 +194,8 @@ where
.access(&[AFTER, &desc.name], &desc.data_type)
},
|additional_column_type| {
match additional_column_type {
&ColumnType::Timestamp(_) => {
match *additional_column_type {
ColumnType::Timestamp(_) => {
// access payload.source.ts_ms
let ts_ms = self
.value_accessor
Expand All @@ -202,6 +208,26 @@ where
.to_scalar_value()
}))
}
ColumnType::DatabaseName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access(&[SOURCE, SOURCE_DB], &desc.data_type),
ColumnType::SchemaName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
ColumnType::TableName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
ColumnType::CollectionName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
_ => Err(AccessError::UnsupportedAdditionalColumn {
name: desc.name.clone(),
}),
Expand Down
Loading
Loading