Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafka backfill frontend #15602

Merged
merged 34 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9c44201
fix: deny create MV on shared CDC source
xxchan Mar 12, 2024
55fde18
feat: add kafka backfill frontend
xxchan Jan 2, 2024
2285c1f
add config for easy of testing
xxchan Feb 28, 2024
6aa8502
cleanup silly issues
xxchan Mar 13, 2024
4c7ae69
fix shared cdc source
xxchan Mar 13, 2024
2daa0a8
fix test
xxchan Mar 13, 2024
8d996f4
rename has_streaming_job -> is_shared
xxchan Mar 15, 2024
82afb30
refine add_partition_offset_cols
xxchan Mar 15, 2024
5abbb11
rename the plan SourceBackfill -> SourceScan
xxchan Mar 15, 2024
fef7611
rename upstream_mview_actors->upstream_root_actors
xxchan Mar 15, 2024
8805238
refine source_manager
xxchan Mar 18, 2024
d028469
refine rewrite_inner, unif CdcFilter and SourceBackfill
xxchan Mar 18, 2024
cc8cb41
Merge branch 'main' into xxchan/backfill-fe
xxchan Mar 18, 2024
4f850de
Merge branch 'main' into xxchan/backfill-fe
xxchan Mar 19, 2024
31b34f0
rename reuseable source -> shared source
xxchan Mar 19, 2024
87c7cad
Merge branch 'main' into xxchan/backfill-fe
xxchan Mar 25, 2024
27db3f6
fix compile
xxchan Mar 25, 2024
9f8f533
fix cdc table stuck. due to track id
xxchan Mar 25, 2024
78a783f
fix
xxchan Mar 25, 2024
06027ca
fix pg_settings order
xxchan Mar 25, 2024
fa521f0
really fix tracking_progress_actor_ids
xxchan Mar 25, 2024
58034cc
- no need for a separated LogicalSourceScan node.
xxchan Mar 26, 2024
18ea279
add (very simple) e2e tests
xxchan Mar 26, 2024
658cb52
fix executor bug: should ignore backfilled rows when the partition is…
xxchan Mar 26, 2024
231b93c
support scaling source backfill
xxchan Mar 26, 2024
0339b38
Revert "support scaling source backfill" (For a new PR)
xxchan Mar 28, 2024
17e3779
refine
xxchan Mar 28, 2024
2e5c1c0
fix
xxchan Mar 28, 2024
144d7ae
Merge remote-tracking branch 'origin/main' into xxchan/backfill-fe
xxchan Mar 31, 2024
e174c12
don't deprecate cdc_source_job for compatibility
xxchan Apr 1, 2024
98cfb72
minor fix
xxchan Apr 1, 2024
f169dec
proto: rename cdc_source_job without renaming
xxchan Apr 2, 2024
aafba45
rename backfillable -> shareable
xxchan Apr 2, 2024
a3accdb
Merge remote-tracking branch 'origin/main' into xxchan/backfill-fe
xxchan Apr 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ d70dba827c303373f3220c9733f7c7443e5c2d37

# chore: cargo +nightly fmt (#13162) (format let-chains)
c583e2c6c054764249acf484438c7bf7197765f4

# chore: replace all ProstXxx with PbXxx (#8621)
6fd8821f2e053957b183d648bea9c95b6703941f
4 changes: 2 additions & 2 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ if ${is_not_ci}
no_rust_log = not ${rust_log}

if ${no_rust_log}
set_env RUST_LOG "pgwire_query_log=info"
set_env RUST_LOG "pgwire_query_log=info,hyper::client::connect::http=info"
xxchan marked this conversation as resolved.
Show resolved Hide resolved
else
set_env RUST_LOG "pgwire_query_log=info,${rust_log}"
set_env RUST_LOG "pgwire_query_log=info,hyper::client::connect::http=info,${rust_log}"
end
end

Expand Down
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ user rw_batch_enable_lookup_join
user rw_batch_enable_sort_agg
user rw_enable_join_ordering
user rw_enable_share_plan
user rw_enable_shared_source
user rw_enable_two_phase_agg
user rw_force_split_distinct_agg
user rw_force_two_phase_agg
Expand Down
7 changes: 7 additions & 0 deletions e2e_test/source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Test in this directory needs some prior setup.

See also `ci/scripts/e2e-source-test.sh`, and `scripts/source`

kwannoel marked this conversation as resolved.
Show resolved Hide resolved
## Kafka

`scripts/source/test_data` contains the data. Filename's convention is `<topic_name>.<n_partitions>`.
58 changes: 58 additions & 0 deletions e2e_test/source/basic/kafka_shared_source.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
control substitution on

statement ok
SET rw_enable_shared_source TO true;

statement ok
create source s0 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_4_partition_topic',
properties.bootstrap.server = '${KAFKA_BOOTSTRAP_SERVER:message_queue:29092}',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
create materialized view mv_1 as select * from s0;

statement ok
SET rw_enable_shared_source TO false;

statement ok
create materialized view mv_2 as select * from s0;

statement ok
flush;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 1s

query IT rowsort
select v1, v2 from s0;
----
1 1
2 22
3 333
4 4444

query IT rowsort
select v1, v2 from mv_1;
----
1 1
2 22
3 333
4 4444

query IT rowsort
select v1, v2 from mv_2;
----
1 1
2 22
3 333
4 4444

# TODO: add more data to the topic and re-check the data. Currently there's no good test infra to do this...
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
# To test the correctness of source backfill, we might need to keep producing data during an interval, to let it go
# through the backfill stage to the forward stage.

statement ok
drop source s0 cascade;
17 changes: 15 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,22 @@ message StreamSourceInfo {
SchemaRegistryNameStrategy name_strategy = 10;
optional string key_message_name = 11;
plan_common.ExternalTableDesc external_table = 12;
// Whether the stream source is a cdc source streaming job.
// We need this field to differentiate the cdc source job until we fully implement risingwavelabs/rfcs#72.
// **This field should now be called `is_shared`.** Not renamed for backwards compatibility.
//
// Whether the stream source is a shared source (it has a streaming job).
// This is related with [RFC: Reusable Source Executor](https://github.com/risingwavelabs/rfcs/pull/72).
//
// Currently, the following sources can be shared:
//
// - Direct CDC sources (mysql & postgresql)
// - MQ sources (Kafka)
bool cdc_source_job = 13;
// Only used when `cdc_source_job` is `true`.
// If `false`, `requires_singleton` will be set in the stream plan.
//
// - Direct CDC sources: `false`
// - MQ sources (Kafka): `true`
bool is_distributed = 15;
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;
}
Expand Down
2 changes: 1 addition & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ message DropViewResponse {
// - GENERAL: Table streaming jobs w/ or w/o a connector
// - SHARED_CDC_SOURCE: The table streaming job is created based on a shared CDC source job (risingwavelabs/rfcs#73).
//
// And one may add other types to support Table jobs that based on other backfill-able sources (risingwavelabs/rfcs#72).
// And one may add other types to support Table jobs that based on other shared sources (risingwavelabs/rfcs#72).
//
// Currently, it's usages include:
// - When creating the streaming actor graph, different table jobs may need different treatment.
Expand Down
5 changes: 3 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@
}

message SourceBackfillNode {
uint32 source_id = 1;
uint32 upstream_source_id = 1;

Check failure on line 209 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "upstream_source_id" on message "SourceBackfillNode" changed option "json_name" from "sourceId" to "upstreamSourceId".

Check failure on line 209 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" on message "SourceBackfillNode" changed name from "source_id" to "upstream_source_id".
optional uint32 row_id_index = 2;
repeated plan_common.ColumnCatalog columns = 3;
catalog.StreamSourceInfo info = 4;
Expand Down Expand Up @@ -876,13 +876,14 @@
FRAGMENT_TYPE_FLAG_MVIEW = 2;
FRAGMENT_TYPE_FLAG_SINK = 4;
FRAGMENT_TYPE_FLAG_NOW = 8; // TODO: Remove this and insert a `BarrierRecv` instead.
// Include StreamScan and StreamCdcScan
FRAGMENT_TYPE_FLAG_STREAM_SCAN = 16;
FRAGMENT_TYPE_FLAG_BARRIER_RECV = 32;
FRAGMENT_TYPE_FLAG_VALUES = 64;
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SUBSCRIPTION = 512;
FRAGMENT_TYPE_FLAG_SOURCE_BACKFILL = 1024;
FRAGMENT_TYPE_FLAG_SOURCE_SCAN = 1024;

Check failure on line 886 in proto/stream_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Enum value "1024" on enum "FragmentTypeFlag" changed name from "FRAGMENT_TYPE_FLAG_SOURCE_BACKFILL" to "FRAGMENT_TYPE_FLAG_SOURCE_SCAN".
}

// The streaming context associated with a stream plan
Expand Down
5 changes: 5 additions & 0 deletions scripts/source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
This folder contains scripts to prepare data for testing sources.

## Kafka

`scripts/source/test_data` contains the data. Filename's convention is `<topic_name>.<n_partitions>`.
3 changes: 2 additions & 1 deletion src/common/metrics/src/monitor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use thiserror_ext::AsReport;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::{Channel, Endpoint};
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -548,7 +549,7 @@ impl<L> tonic::transport::server::Router<L> {
config.tcp_nodelay,
config.keepalive_duration,
)
.unwrap();
.unwrap_or_else(|err| panic!("failed to connect to {listen_addr}: {}", err.as_report()));
let incoming = MonitoredConnection::new(
incoming,
MonitorNewConnectionImpl {
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#![feature(negative_impls)]
#![feature(bound_map)]
#![feature(array_methods)]
#![feature(register_tool)]
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
#![register_tool(rw)]

#[cfg_attr(not(test), allow(unused_extern_crates))]
extern crate self as risingwave_common;
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,13 @@ pub struct ConfigMap {
#[parameter(default = false)]
background_ddl: bool,

/// Enable shared source. Currently only for Kafka.
///
/// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source
/// will forward the data from the same source streaming job, and also backfill prior data from the external source.
#[parameter(default = false)]
rw_enable_shared_source: bool,
xxchan marked this conversation as resolved.
Show resolved Hide resolved

/// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time.
#[parameter(default = SERVER_ENCODING)]
server_encoding: String,
Expand Down
3 changes: 2 additions & 1 deletion src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ macro_rules! impl_set_system_param {
$(
key_of!($field) => {
let v = if let Some(v) = value {
v.as_ref().parse().map_err(|_| format!("cannot parse parameter value"))?
#[allow(rw::format_error)]
v.as_ref().parse().map_err(|e| format!("cannot parse parameter value: {e}"))?
} else {
$default.ok_or_else(|| format!("{} does not have a default value", key))?
};
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ pub fn visit_stream_node_tables_inner<F>(
always!(source.state_table, "FsFetch");
}
}
NodeBody::SourceBackfill(node) => {
always!(node.state_table, "SourceBackfill")
}

// Sink
NodeBody::Sink(node) => {
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ arrow-array = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
assert_matches = "1"
async-nats = "0.34"
async-trait = "0.1"
auto_enums = { version = "0.8", features = ["futures03"] }
Expand Down
78 changes: 78 additions & 0 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,84 @@ pub fn build_additional_column_catalog(
Ok(catalog)
}

/// Utility function for adding partition and offset columns to the columns, if not specified by the user.
///
/// ## Returns
/// - `columns_exist`: whether 1. `partition`/`file` and 2. `offset` columns are included in `columns`.
/// - `additional_columns`: The `ColumnCatalog` for `partition`/`file` and `offset` columns.
pub fn add_partition_offset_cols(
xxchan marked this conversation as resolved.
Show resolved Hide resolved
columns: &[ColumnCatalog],
connector_name: &str,
) -> ([bool; 2], [ColumnCatalog; 2]) {
let mut columns_exist = [false; 2];
let mut last_column_id = columns
.iter()
.map(|c| c.column_desc.column_id)
.max()
.unwrap_or(ColumnId::placeholder());

let additional_columns: Vec<_> = {
let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
.get(connector_name)
.unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS);
["partition", "file", "offset"]
xxchan marked this conversation as resolved.
Show resolved Hide resolved
.iter()
.filter_map(|col_type| {
last_column_id = last_column_id.next();
if compat_col_types.contains(col_type) {
Some(
build_additional_column_catalog(
last_column_id,
connector_name,
col_type,
None,
None,
None,
false,
)
.unwrap(),
)
} else {
None
}
})
.collect()
};
assert_eq!(additional_columns.len(), 2);
xxchan marked this conversation as resolved.
Show resolved Hide resolved
xxchan marked this conversation as resolved.
Show resolved Hide resolved
use risingwave_pb::plan_common::additional_column::ColumnType;
assert_matches::assert_matches!(
additional_columns[0].column_desc.additional_column,
AdditionalColumn {
column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
}
);
assert_matches::assert_matches!(
additional_columns[1].column_desc.additional_column,
AdditionalColumn {
column_type: Some(ColumnType::Offset(_)),
}
);

// Check if partition/file/offset columns are included explicitly.
for col in columns {
match col.column_desc.additional_column {
AdditionalColumn {
column_type: Some(ColumnType::Partition(_) | ColumnType::Filename(_)),
} => {
columns_exist[0] = true;
}
AdditionalColumn {
column_type: Some(ColumnType::Offset(_)),
} => {
columns_exist[1] = true;
}
_ => (),
}
}

(columns_exist, additional_columns.try_into().unwrap())
}

fn build_header_catalog(
column_id: ColumnId,
col_name: &str,
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ pub trait SourceProperties: TryFromHashmap + Clone + WithOptions {
type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;

/// Load additional info from `PbSource`. Currently only used by CDC.
fn init_from_pb_source(&mut self, _source: &PbSource) {}

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}
}

Expand Down Expand Up @@ -366,10 +368,12 @@ impl ConnectorProperties {
matches!(self, ConnectorProperties::Kinesis(_))
}

/// Load additional info from `PbSource`. Currently only used by CDC.
pub fn init_from_pb_source(&mut self, source: &PbSource) {
dispatch_source_prop!(self, prop, prop.init_from_pb_source(source))
}

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
dispatch_source_prop!(self, prop, prop.init_from_pb_cdc_table_desc(cdc_table_desc))
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where
};
self.table_schema = table_schema;
if let Some(info) = source.info.as_ref() {
self.is_cdc_source_job = info.cdc_source_job;
self.is_cdc_source_job = info.is_shared();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ impl BrokerAddrRewriter {
role: PrivateLinkContextRole,
broker_rewrite_map: Option<HashMap<String, String>>,
) -> ConnectorResult<Self> {
tracing::info!("[{}] rewrite map {:?}", role, broker_rewrite_map);
let rewrite_map: ConnectorResult<BTreeMap<BrokerAddr, BrokerAddr>> = broker_rewrite_map
.map_or(Ok(BTreeMap::new()), |addr_map| {
tracing::info!("[{}] rewrite map {:?}", role, addr_map);
xxchan marked this conversation as resolved.
Show resolved Hide resolved
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
addr_map
.into_iter()
.map(|(old_addr, new_addr)| {
Expand Down
Loading
Loading