Skip to content

Commit

Permalink
feat(cdc): support more metadata columns for MySQL, PG and MongoDB (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Jun 5, 2024
1 parent b9dcdd3 commit e4f1c48
Show file tree
Hide file tree
Showing 17 changed files with 353 additions and 84 deletions.
27 changes: 25 additions & 2 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ create table rw.products_test ( id INT,
name STRING,
description STRING,
PRIMARY KEY (id)
) include timestamp as commit_ts from mysql_mytest table 'mytest.products';
) include timestamp as commit_ts
include database_name as database_name
include table_name as table_name
from mysql_mytest table 'mytest.products';

# sleep to ensure (default,'Milk','Milk is a white liquid food') is consumed from Debezium message instead of backfill.
sleep 10s
Expand Down Expand Up @@ -153,6 +156,14 @@ SELECT id,name,description FROM rw.products_test order by id limit 3
102 car battery 12V car battery
103 12-pack drill bits 12-pack of drill bits with sizes ranging from #40 to #3

query TT
select database_name, table_name from rw.products_test limit 3;
----
mytest products
mytest products
mytest products


# commit_ts of historical records should be '1970-01-01 00:00:00+00:00'
query I
SELECT count(*) as cnt from rw.products_test where commit_ts = '1970-01-01 00:00:00+00:00'
Expand Down Expand Up @@ -247,7 +258,11 @@ CREATE TABLE person_new (
credit_card varchar,
city varchar,
PRIMARY KEY (id)
) INCLUDE TIMESTAMP AS commit_ts FROM pg_source TABLE 'public.person';
) INCLUDE TIMESTAMP AS commit_ts
INCLUDE DATABASE_NAME as database_name
INCLUDE SCHEMA_NAME as schema_name
INCLUDE TABLE_NAME as table_name
FROM pg_source TABLE 'public.person';

statement ok
CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new;
Expand Down Expand Up @@ -276,6 +291,14 @@ SELECT * from person_new_cnt
----
6

query TTT
SELECT database_name,schema_name,table_name from person_new limit 3;
----
cdc_test public person
cdc_test public person
cdc_test public person


query ITTTT
SELECT id,name,email_address,credit_card,city from person_new order by id;
----
Expand Down
13 changes: 12 additions & 1 deletion e2e_test/source/cdc/mongodb/mongodb_basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
control substitution on

statement ok
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) INCLUDE TIMESTAMP as commit_ts WITH (
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB)
INCLUDE TIMESTAMP as commit_ts
INCLUDE DATABASE_NAME as database_name
INCLUDE COLLECTION_NAME as collection_name
WITH (
connector = 'mongodb-cdc',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'random_data.*'
Expand Down Expand Up @@ -30,5 +34,12 @@ select count(*) from users where commit_ts = '1970-01-01 00:00:00+00:00';
----
55

query TT
select database_name, collection_name FROM users LIMIT 2;
----
random_data users
random_data users


statement ok
DROP TABLE users cascade
13 changes: 13 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,15 @@ message AdditionalColumnHeader {
data.DataType data_type = 2;
}

// metadata column for cdc table
message AdditionalDatabaseName {}

message AdditionalSchemaName {}

message AdditionalTableName {}

message AdditionalCollectionName {}

// this type means we read all headers as a whole
message AdditionalColumnHeaders {}

Expand All @@ -215,6 +224,10 @@ message AdditionalColumn {
AdditionalColumnHeader header_inner = 5;
AdditionalColumnFilename filename = 6;
AdditionalColumnHeaders headers = 7;
AdditionalDatabaseName database_name = 8;
AdditionalSchemaName schema_name = 9;
AdditionalTableName table_name = 10;
AdditionalCollectionName collection_name = 11;
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ async fn test_cdc_backfill() -> StreamResult<()> {
MySqlOffset::new(binlog_file.clone(), 10),
];

let table_name = SchemaTableName::new("mock_table".to_string(), "public".to_string());
let table_name = SchemaTableName {
schema_name: "public".to_string(),
table_name: "mock_table".to_string(),
};
let table_schema = Schema::new(vec![
Field::with_name(DataType::Int64, "id"), // primary key
Field::with_name(DataType::Float64, "price"),
Expand All @@ -179,6 +182,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
let external_table = ExternalStorageTable::new(
TableId::new(1234),
table_name,
"mydb".to_string(),
ExternalTableReaderImpl::Mock(MockExternalTableReader::new(binlog_watermarks)),
table_schema.clone(),
table_pk_order_types,
Expand Down
73 changes: 68 additions & 5 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType as PbDataType;
use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
use risingwave_pb::plan_common::{
AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader, AdditionalColumnHeaders,
AdditionalColumnKey, AdditionalColumnOffset, AdditionalColumnPartition,
AdditionalColumnTimestamp,
AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader,
AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset,
AdditionalColumnPartition, AdditionalColumnTimestamp, AdditionalDatabaseName,
AdditionalSchemaName, AdditionalTableName,
};

use crate::error::ConnectorResult;
Expand Down Expand Up @@ -59,14 +60,27 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
// mongodb-cdc doesn't support cdc backfill table
(
MONGODB_CDC_CONNECTOR,
HashSet::from(["timestamp", "partition", "offset"]),
HashSet::from([
"timestamp",
"partition",
"offset",
"database_name",
"collection_name",
]),
),
])
});

// For CDC backfill table, the additional columns are added to the schema of `StreamCdcScan`
pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock<Option<HashSet<&'static str>>> =
LazyLock::new(|| Some(HashSet::from(["timestamp"])));
LazyLock::new(|| {
Some(HashSet::from([
"timestamp",
"database_name",
"schema_name",
"table_name",
]))
});

pub fn get_supported_additional_columns(
connector_name: &str,
Expand Down Expand Up @@ -201,6 +215,55 @@ pub fn build_additional_column_catalog(
is_hidden: false,
},
"header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type),
"database_name" => ColumnCatalog {
column_desc: ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::DatabaseName(
AdditionalDatabaseName {},
)),
},
),
is_hidden: false,
},
"schema_name" => ColumnCatalog {
column_desc: ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})),
},
),
is_hidden: false,
},

"table_name" => ColumnCatalog {
column_desc: ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})),
},
),
is_hidden: false,
},
"collection_name" => ColumnCatalog {
column_desc: ColumnDesc::named_with_additional_column(
column_name,
column_id,
DataType::Varchar,
AdditionalColumn {
column_type: Some(AdditionalColumnType::CollectionName(
AdditionalCollectionName {},
)),
},
),
is_hidden: false,
},
_ => unreachable!(),
};

Expand Down
29 changes: 28 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::maxwell::MaxwellParser;
use crate::parser::simd_json_parser::DebeziumMongoJsonAccessBuilder;
use crate::parser::util::{
extract_header_inner_from_meta, extract_headers_from_meta, extreact_timestamp_from_meta,
extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta,
extreact_timestamp_from_meta,
};
use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
Expand Down Expand Up @@ -401,12 +402,38 @@ impl SourceStreamChunkRowWriter<'_> {
.unwrap(), // handled all match cases in internal match, unwrap is safe
));
}

(
_, // for cdc tables
&Some(ref col @ AdditionalColumnType::DatabaseName(_))
| &Some(ref col @ AdditionalColumnType::TableName(_)),
) => {
match self.row_meta {
Some(row_meta) => {
if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.meta {
Ok(A::output_for(
extract_cdc_meta_column(cdc_meta, col, desc.name.as_str())?
.unwrap_or(None),
))
} else {
Err(AccessError::Uncategorized {
message: "CDC metadata not found in the message".to_string(),
})
}
}
None => parse_field(desc), // parse from payload
}
}
(_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
Some(row_meta) => Ok(A::output_for(
extreact_timestamp_from_meta(row_meta.meta).unwrap_or(None),
)),
None => parse_field(desc), // parse from payload
},
(_, &Some(AdditionalColumnType::CollectionName(_))) => {
// collection name for `mongodb-cdc` should be parsed from the message payload
parse_field(desc)
}
(_, &Some(AdditionalColumnType::Partition(_))) => {
// the meta info does not involve spec connector
return Ok(A::output_for(
Expand Down
36 changes: 16 additions & 20 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,11 @@ mod tests {
if i == 0 {
// put begin message at first
source_msg_batch.push(SourceMessage {
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta {
full_table_name: "orders".to_string(),
source_ts_ms: 0,
is_transaction_meta: transactional,
}),
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
"orders".to_string(),
0,
transactional,
)),
split_id: SplitId::from("1001"),
offset: "0".into(),
key: None,
Expand All @@ -292,11 +292,11 @@ mod tests {
// put data messages
for data_msg in batch {
source_msg_batch.push(SourceMessage {
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta {
full_table_name: "orders".to_string(),
source_ts_ms: 0,
is_transaction_meta: false,
}),
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
"orders".to_string(),
0,
false,
)),
split_id: SplitId::from("1001"),
offset: "0".into(),
key: None,
Expand All @@ -306,11 +306,11 @@ mod tests {
if i == data_batches.len() - 1 {
// put commit message at last
source_msg_batch.push(SourceMessage {
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta {
full_table_name: "orders".to_string(),
source_ts_ms: 0,
is_transaction_meta: transactional,
}),
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
"orders".to_string(),
0,
transactional,
)),
split_id: SplitId::from("1001"),
offset: "0".into(),
key: None,
Expand Down Expand Up @@ -355,11 +355,7 @@ mod tests {
let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;

let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta {
full_table_name: "orders".to_string(),
source_ts_ms: 0,
is_transaction_meta: true,
});
let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new("orders".to_string(), 0, true));
let msg_meta = MessageMeta {
meta: &cdc_meta,
split_id: "1001",
Expand Down
30 changes: 28 additions & 2 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,14 @@ pub struct DebeziumChangeEvent<A> {

const BEFORE: &str = "before";
const AFTER: &str = "after";

const SOURCE: &str = "source";
const SOURCE_TS_MS: &str = "ts_ms";
const SOURCE_DB: &str = "db";
const SOURCE_SCHEMA: &str = "schema";
const SOURCE_TABLE: &str = "table";
const SOURCE_COLLECTION: &str = "collection";

const OP: &str = "op";
pub const TRANSACTION_STATUS: &str = "status";
pub const TRANSACTION_ID: &str = "id";
Expand Down Expand Up @@ -188,8 +194,8 @@ where
.access(&[AFTER, &desc.name], &desc.data_type)
},
|additional_column_type| {
match additional_column_type {
&ColumnType::Timestamp(_) => {
match *additional_column_type {
ColumnType::Timestamp(_) => {
// access payload.source.ts_ms
let ts_ms = self
.value_accessor
Expand All @@ -202,6 +208,26 @@ where
.to_scalar_value()
}))
}
ColumnType::DatabaseName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access(&[SOURCE, SOURCE_DB], &desc.data_type),
ColumnType::SchemaName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type),
ColumnType::TableName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access(&[SOURCE, SOURCE_TABLE], &desc.data_type),
ColumnType::CollectionName(_) => self
.value_accessor
.as_ref()
.expect("value_accessor must be provided for upsert operation")
.access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type),
_ => Err(AccessError::UnsupportedAdditionalColumn {
name: desc.name.clone(),
}),
Expand Down
Loading

0 comments on commit e4f1c48

Please sign in to comment.