Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Connection for Kafka source & sink #19270

Merged
merged 70 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
b9b2f79
share kafka client on meta
tabVersion Oct 22, 2024
ab46672
check kafka connection identical
tabVersion Oct 22, 2024
3373a58
fix
tabVersion Oct 22, 2024
c5203d6
fix with props
tabVersion Oct 22, 2024
3b6a6a2
fix
tabVersion Oct 22, 2024
3b3f725
use connection hash as hashmap entry
tabVersion Oct 22, 2024
968ed08
fix
tabVersion Oct 22, 2024
a41f3fc
fix
Oct 23, 2024
6534ebf
rerun
tabVersion Oct 23, 2024
7249e78
Merge branch 'tab/share-kafka-client-enum' of https://github.com/risi…
tabVersion Oct 23, 2024
0024e1d
Merge remote-tracking branch 'origin' into tab/share-kafka-client-enum
tabVersion Oct 23, 2024
ad8b989
fix
tabVersion Oct 23, 2024
ae1b70a
fix
tabVersion Oct 23, 2024
58b5128
better with options
Oct 25, 2024
f115a0c
use kafka connection as hashkey
Oct 25, 2024
d128644
use moka
Oct 25, 2024
ae9df41
fix lint
Oct 25, 2024
45295bc
fix
Oct 25, 2024
a9e34c7
fix
Oct 25, 2024
d27ab90
Merge branch 'main' into tab/share-kafka-client-enum
tabVersion Oct 25, 2024
256485c
Merge remote-tracking branch 'origin' into tab/share-kafka-client-enum
Oct 28, 2024
62cb953
Merge branch 'tab/share-kafka-client-enum' of https://github.com/risi…
Oct 28, 2024
725e23c
remove get hash func
Oct 28, 2024
832f66f
migrate to Weak
Oct 28, 2024
73f0b7b
minor
Oct 28, 2024
ac1d63d
fix
Oct 28, 2024
35fb002
Merge remote-tracking branch 'origin' into tab/share-kafka-client-enum
tabVersion Oct 29, 2024
ec49096
test bump quanta to 0.12.3
tabVersion Oct 29, 2024
16d8c42
update patch
tabVersion Oct 29, 2024
b3efda6
moka 0.12.3
Oct 30, 2024
6a729f5
update proto
Nov 1, 2024
fbc9917
Merge remote-tracking branch 'origin' into tab/connection
Nov 1, 2024
a79d5da
stash
Nov 2, 2024
a561ea3
stash
Nov 2, 2024
2d7ec88
Merge remote-tracking branch 'origin' into tab/connection
Nov 2, 2024
3a18c4c
stash
Nov 3, 2024
60c09fd
basic
Nov 4, 2024
e9f8d72
handle secret ref
Nov 5, 2024
23b2011
Merge remote-tracking branch 'origin' into tab/connection
Nov 5, 2024
673bccb
stash
Nov 6, 2024
635975d
fix
Nov 6, 2024
0fd3972
Merge remote-tracking branch 'origin' into tab/connection
Nov 7, 2024
0e01a05
Merge remote-tracking branch 'origin' into tab/connection
Nov 12, 2024
01363ec
macro expansion
Nov 15, 2024
94f730e
fix compile
wenym1 Nov 15, 2024
5baccf3
dep graph on connection
Nov 15, 2024
f587e8b
Merge remote-tracking branch 'origin' into tab/connection
Nov 15, 2024
1da1d1a
merge fix
Nov 15, 2024
403868e
connection dep for sink
Nov 15, 2024
aaa6a34
Merge remote-tracking branch 'origin' into tab/connection
Nov 17, 2024
1d2cb3d
stash
Nov 18, 2024
45a9b8d
Merge remote-tracking branch 'origin' into tab/connection
Nov 18, 2024
cc75ea7
add slt test
Nov 18, 2024
a446707
fix
Nov 18, 2024
e871ab7
fmt
Nov 18, 2024
eb371d5
fix
Nov 18, 2024
99b2094
fix
Nov 18, 2024
795a79d
resolve comments
tabVersion Nov 20, 2024
c26c05b
Merge remote-tracking branch 'origin' into tab/connection
tabVersion Nov 20, 2024
64a4e26
Merge remote-tracking branch 'origin' into tab/connection
Nov 25, 2024
28bb651
rename migration
Nov 25, 2024
9c0e9df
refactor: make object ref outside `ast::Value` (#19559)
tabVersion Nov 25, 2024
e312ef2
Merge branch 'main' into tab/connection
tabVersion Nov 26, 2024
d24fafb
rerun
Nov 26, 2024
dc3fd25
fix catalog proto
Nov 26, 2024
401660b
Revert "fix catalog proto"
Nov 26, 2024
08fb575
change syntax
Nov 26, 2024
48a28a2
Merge remote-tracking branch 'origin' into tab/connection
Nov 26, 2024
d5be997
Merge remote-tracking branch 'origin' into tab/connection
Nov 28, 2024
2f0a4a8
test
Nov 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ arrow-udf-flight = "0.4"
clap = { version = "4", features = ["cargo", "derive", "env"] }
# Use a forked version which removes the dependencies on dynamo db to reduce
# compile time and binary size.
deltalake = { version = "0.20.1", features = ["s3", "gcs", "datafusion"] }
deltalake = { version = "0.20.1", features = [
"s3",
"gcs",
"datafusion",
] }
itertools = "0.13.0"
jsonbb = "0.1.4"
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" }
Expand Down
107 changes: 107 additions & 0 deletions e2e_test/source_inline/connection/ddl.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
control substitution on

# for non-shared source
statement ok
set streaming_use_shared_source to false;

statement ok
create secret sec_broker with (backend = 'meta') as '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}';

statement error unknown field `foo`
create connection conn with (type = 'kafka', properties.bootstrap.server = secret sec_broker, foo = 'bar');

statement error Connection type "kinesis" is not supported
create connection conn with (type = 'kinesis');

statement ok
create connection conn with (type = 'kafka', properties.bootstrap.server = secret sec_broker, properties.security.protocol = 'plaintext');

query TTT
select "name", "type_" from rw_catalog.rw_connections;
----
conn CONNECTION_TYPE_KAFKA

# unstable test serialization due to iter on hashmap
# the "connectiond_params" looks like:
# {"properties.bootstrap.server":"SECRET sec_broker AS TEXT","properties.security.protocol":"plaintext"}

statement error Permission denied: PermissionDenied: secret used by 1 other objects.
drop secret sec_broker;

statement error Duplicated key in both WITH clause and Connection catalog: properties.security.protocol
create table t1 (a int, b varchar) with (
connector = 'kafka',
connection = conn,
topic = 'connection_ddl_1',
properties.security.protocol = 'plaintext')
format plain encode json;

statement error connector kinesis and connection type Kafka are not compatible
create table t1 (a int, b varchar) with (
connector = 'kinesis',
connection = conn,
stream = 'connection_ddl_1',
region = 'us-east-1')
format plain encode json;

system ok
rpk topic create connection_ddl_1 -p 1

statement ok
create table t1 (a int, b varchar) with (
connector = 'kafka',
connection = conn,
topic = 'connection_ddl_1')
format plain encode json;

statement error Permission denied: PermissionDenied: connection used by 1 other objects.
drop connection conn;

# Connection & Source & Sink will have independent rely on the secret
statement error Permission denied: PermissionDenied: secret used by 2 other objects.
drop secret sec_broker;

statement ok
create table data_table (a int, b varchar);

statement ok
insert into data_table values (1, 'a'), (2, 'b'), (3, 'c');

statement ok
flush;

statement ok
create sink sink_kafka from data_table with (
connector = 'kafka',
connection = conn,
topic = 'connection_ddl_1'
) format plain encode json (
force_append_only='true'
);

sleep 3s

query IT rowsort
select a, b from t1;
----
1 a
2 b
3 c

statement ok
drop sink sink_kafka

statement ok
drop table data_table;

statement ok
drop table t1;

statement ok
drop connection conn;

statement ok
drop secret sec_broker;

statement ok
set streaming_use_shared_source to true;
24 changes: 24 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ message StreamSourceInfo {
// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type.
// For format and encode options.
map<string, secret.SecretRef> format_encode_secret_refs = 16;

// ref connection for schema registry
optional uint32 connection_id = 17;
}

message WebhookSourceInfo {
Expand Down Expand Up @@ -128,6 +131,8 @@ message Source {
uint32 associated_table_id = 12;
}
string definition = 13;

// ref connection for connector
optional uint32 connection_id = 14;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we just reuse Source.connection_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and same for the schema registry part and sink.


optional uint64 initialized_at_epoch = 15;
Expand Down Expand Up @@ -161,6 +166,9 @@ message SinkFormatDesc {
optional plan_common.EncodeType key_encode = 4;
// Secret used for format encode options.
map<string, secret.SecretRef> secret_refs = 5;

// ref connection for schema registry
optional uint32 connection_id = 6;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be added to SinkParam & SinkDesc instead, side-by-side with the Sink's properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me clear the logic for the related proto defs

  • Sink (proto/catalog.proto): def of sink in catalog for all sink, we will read props and resolve connection dep based on this message. And the message contains SinkFormatDesc. We already have connection.id field here.
    • We do solve connection ref when creating sink, so the props in Sink.properties contains the ones from connection catalog.
  • SinkFormatDesc (proto/catalog.proto): specifying format ... encode ... options (and contained by sink catalog). We ref a connection here for schema registry.
  • SinkDesc (proto/stream_plan.proto): It defines what field we dispatch to CNs. But we have already resolved the props from connection def in meta so there is no need to change things here.
  • SinkParam (proto/connector_service.proto): seems to define the transport protocol between kernel and connector node (most ref in Java). And ditto, the step is behind the meta node so no change is needed.

Copy link
Member

@fuyufjh fuyufjh Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do solve connection ref when creating sink, so the props in Sink.properties contains the ones from connection catalog.

I see. So here we expand all the connections into concrete parameters on creation, right? This is okay to me, but in this way, I think connection_id here in SinkFormatDesc should not appear as well, because all connection-related things are already eliminated on creation.

(It seems SinkFormatDesc::connection_id was indeed not used anywhere?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the source side, the connector part and schema registry part will have independent connection, just like individual secret_refs.

we are offering syntax like

create source s(...) with (
  connector = 'kafka',
  connection = <connector connection>, ...
) format ... encode ... (
  connection = <schema registry connection / glue connection>
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I know. What I mean is: should it be resolved and eliminated before converting into a proto message?

We do solve connection ref when creating sink, so the props in Sink.properties contains the ones from connection catalog.

Copy link
Contributor Author

@tabVersion tabVersion Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I know. What I mean is: should it be resolved and eliminated before converting into a proto message?

We resolve the props in the frontend and the catalog is persisted in the meta. We have to reserve a field to keep the connection_id to maintain the dependency. And to keep the design simple and align with the secret ref, I think adding connection.id for each connector and schema registry is the best practice.

& We are going to support alter connection and apply the changes to all related source/sink. Eliminating the connection.id in the catalog makes the step harder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You keep both connection_id and the resolved connection arguments in these message structures, right? It's acceptable to me but a bit counter-intuitive.

And to keep the design simple and align with the secret ref,

IIRC, secret ref doesn't keep the resolved plaintext secret at all. It always resolves whenever using a secret.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, secret ref doesn't keep the resolved plaintext secret at all. It always resolves whenever using a secret.

Oh, here is a some gap on resolve.
You can ref to resolve_connection_ref_and_secret_ref, resolve here means extracting xxx = secret <secret name> from "with props" to PbSecretRef (secret_id). We will do the same op for secrets coming from both connection catalog and with props.

}

// the catalog of the sink. There are two kind of schema here. The full schema is all columns
Expand All @@ -183,6 +191,8 @@ message Sink {
uint32 owner = 11;
map<string, string> properties = 12;
string definition = 13;

// ref connection for connector
optional uint32 connection_id = 14;
optional uint64 initialized_at_epoch = 15;
optional uint64 created_at_epoch = 16;
Expand Down Expand Up @@ -231,6 +241,19 @@ message Subscription {
SubscriptionState subscription_state = 19;
}

message ConnectionParams {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a weak-typed implementation, as the properties are kept as a map<string,string> here.

On the contrary, we may optionally make it strong-typed by defining connection's fields as a message (such as message KafkaConnection), and meanwhile, the corresponding struct (such as struct KafkaConnectionInner) would be substituted.

I tend to prefer the latter one, but I am not sure how to handle secrets. Comments are welcomed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this idea before.
If we are going with a concrete type here, we have to accept a more complex impl when handling create source/sink/table, ie. we have to know an attr comes from either with-clause or connection catalog and make sure it won't collide. Besides, secret ref is a problem.

Current impl overcomes the prior problem by merging everything into hashmap and keeps the original path to create source/sink/table unchanged.

enum ConnectionType {
CONNECTION_TYPE_UNSPECIFIED = 0;
CONNECTION_TYPE_KAFKA = 1;
CONNECTION_TYPE_ICEBERG = 2;
CONNECTION_TYPE_SCHEMA_REGISTRY = 3;
}

ConnectionType connection_type = 1;
map<string, string> properties = 2;
map<string, secret.SecretRef> secret_refs = 3;
}

message Connection {
message PrivateLinkService {
enum PrivateLinkProvider {
Expand All @@ -251,6 +274,7 @@ message Connection {
string name = 4;
oneof info {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to keep the oneof now. May remove it to improve readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are still using the proto in meta backup. Removing the oneof might break compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, then let's place connection_params outside the oneof at least

PrivateLinkService private_link_service = 5 [deprecated = true];
ConnectionParams connection_params = 7;
}
uint32 owner = 6;
}
Expand Down
3 changes: 2 additions & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ message CreateConnectionRequest {
uint32 database_id = 2;
uint32 schema_id = 3;
oneof payload {
PrivateLink private_link = 4;
PrivateLink private_link = 4 [deprecated = true];
catalog.ConnectionParams connection_params = 6;
}
uint32 owner_id = 5;
}
Expand Down
1 change: 1 addition & 0 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ fn mock_from_legacy_type(
options: Default::default(),
secret_refs: Default::default(),
key_encode: None,
connection_id: None,
}))
} else {
SinkFormatDesc::from_legacy_type(connector, r#type)
Expand Down
32 changes: 29 additions & 3 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl AwsAuthProps {

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
pub struct KafkaConnection {
pub struct KafkaConnectionProps {
#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
pub brokers: String,

Expand Down Expand Up @@ -244,6 +244,7 @@ pub struct KafkaConnection {
#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaCommon {
// connection related props are moved to `KafkaConnection`
#[serde(rename = "topic", alias = "kafka.topic")]
pub topic: String,

Expand All @@ -256,7 +257,7 @@ pub struct KafkaCommon {
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
pub struct KafkaPrivateLinkCommon {
/// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
#[serde(rename = "broker.rewrite.endpoints")]
Expand Down Expand Up @@ -321,7 +322,32 @@ impl RdKafkaPropertiesCommon {
}
}

impl KafkaConnection {
impl KafkaConnectionProps {
#[cfg(test)]
pub fn test_default() -> Self {
Self {
brokers: "localhost:9092".to_string(),
security_protocol: None,
ssl_ca_location: None,
ssl_certificate_location: None,
ssl_key_location: None,
ssl_ca_pem: None,
ssl_certificate_pem: None,
ssl_key_pem: None,
ssl_key_password: None,
ssl_endpoint_identification_algorithm: None,
sasl_mechanism: None,
sasl_username: None,
sasl_password: None,
sasl_kerberos_service_name: None,
sasl_kerberos_keytab: None,
sasl_kerberos_principal: None,
sasl_kerberos_kinit_cmd: None,
sasl_kerberos_min_time_before_relogin: None,
sasl_oathbearer_config: None,
}
}

pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) {
// AWS_MSK_IAM
if self.is_aws_msk_iam() {
Expand Down
128 changes: 128 additions & 0 deletions src/connector/src/connector_common/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::ClientConfig;
use risingwave_common::secret::LocalSecretManager;
use risingwave_pb::catalog::PbConnection;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tonic::async_trait;
use with_options::WithOptions;

use crate::connector_common::{AwsAuthProps, KafkaConnectionProps, KafkaPrivateLinkCommon};
use crate::error::ConnectorResult;
use crate::source::kafka::{KafkaContextCommon, RwConsumerContext};
use crate::{dispatch_connection_impl, ConnectionImpl};

#[async_trait]
pub trait Connection {
async fn test_connection(&self) -> ConnectorResult<()>;
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct KafkaConnection {
#[serde(flatten)]
pub inner: KafkaConnectionProps,
#[serde(flatten)]
pub kafka_private_link_common: KafkaPrivateLinkCommon,
#[serde(flatten)]
pub aws_auth_props: AwsAuthProps,
}

pub async fn validate_connection(connection: &PbConnection) -> ConnectorResult<()> {
if let Some(ref info) = connection.info {
match info {
risingwave_pb::catalog::connection::Info::ConnectionParams(cp) => {
let options = cp.properties.clone().into_iter().collect();
let secret_refs = cp.secret_refs.clone().into_iter().collect();
let props_secret_resolved =
LocalSecretManager::global().fill_secrets(options, secret_refs)?;
let connection_impl =
ConnectionImpl::from_proto(cp.connection_type(), props_secret_resolved)?;
dispatch_connection_impl!(connection_impl, inner, inner.test_connection().await?)
}
risingwave_pb::catalog::connection::Info::PrivateLinkService(_) => unreachable!(),
}
}
Ok(())
}

#[async_trait]
impl Connection for KafkaConnection {
async fn test_connection(&self) -> ConnectorResult<()> {
let client = self.build_client().await?;
// describe cluster here
client.fetch_metadata(None, Duration::from_secs(10)).await?;
Ok(())
}
}

impl KafkaConnection {
async fn build_client(&self) -> ConnectorResult<BaseConsumer<RwConsumerContext>> {
let mut config = ClientConfig::new();
let bootstrap_servers = &self.inner.brokers;
let broker_rewrite_map = self.kafka_private_link_common.broker_rewrite_map.clone();
config.set("bootstrap.servers", bootstrap_servers);
self.inner.set_security_properties(&mut config);

// dup with Kafka Enumerator
let ctx_common = KafkaContextCommon::new(
broker_rewrite_map,
None,
None,
self.aws_auth_props.clone(),
self.inner.is_aws_msk_iam(),
)
.await?;
let client_ctx = RwConsumerContext::new(ctx_common);
let client: BaseConsumer<RwConsumerContext> =
config.create_with_context(client_ctx).await?;
if self.inner.is_aws_msk_iam() {
#[cfg(not(madsim))]
client.poll(Duration::from_secs(10)); // note: this is a blocking call
#[cfg(madsim)]
client.poll(Duration::from_secs(10)).await;
}
Ok(client)
}
}

#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
#[serde(deny_unknown_fields)]
pub struct IcebergConnection {}

#[async_trait]
impl Connection for IcebergConnection {
async fn test_connection(&self) -> ConnectorResult<()> {
todo!()
}
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
#[serde(deny_unknown_fields)]
pub struct SchemaRegistryConnection {}

#[async_trait]
impl Connection for SchemaRegistryConnection {
async fn test_connection(&self) -> ConnectorResult<()> {
todo!()
}
}
Loading