From c55b13dde420f7feb69df4cb86f8aac9af21ad75 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 12 Mar 2024 15:30:11 +0800 Subject: [PATCH 1/2] refactor: introduce Get and WithPropertiesExt --- src/connector/src/lib.rs | 1 + src/connector/src/source/base.rs | 13 +--- .../src/source/kafka/private_link.rs | 11 --- src/connector/src/with_options.rs | 72 ++++++++++++++++++- .../src/catalog/connection_catalog.rs | 14 +--- .../src/handler/alter_source_with_sr.rs | 4 +- src/frontend/src/handler/create_source.rs | 23 +++--- src/frontend/src/handler/create_table.rs | 5 +- src/frontend/src/handler/util.rs | 41 ----------- .../src/optimizer/plan_node/generic/source.rs | 8 +-- src/frontend/src/utils/with_options.rs | 17 +---- .../src/from_proto/source/trad_source.rs | 6 +- 12 files changed, 99 insertions(+), 116 deletions(-) diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 4a437755c518..606d3aa8b2e0 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -56,6 +56,7 @@ pub mod common; pub use paste::paste; mod with_options; +pub use with_options::WithPropertiesExt; #[cfg(test)] mod with_options_test; diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index c0ef0b65f898..52724b170766 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; use anyhow::anyhow; @@ -331,17 +331,6 @@ impl Default for ConnectorProperties { } impl ConnectorProperties { - pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap) -> bool { - with_properties - .get(UPSTREAM_SOURCE_KEY) - .map(|s| { - s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR) - || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR) - || s.eq_ignore_ascii_case(GCS_CONNECTOR) - }) - .unwrap_or(false) - } - pub fn is_new_fs_connector_hash_map(with_properties: &HashMap) -> bool { with_properties .get(UPSTREAM_SOURCE_KEY) diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 3eebacca09f9..d2e3d6877d16 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -33,7 +33,6 @@ use crate::common::{ use crate::error::ConnectorResult; use crate::source::kafka::stats::RdKafkaStats; use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS}; -use crate::source::KAFKA_CONNECTOR; pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint"; pub const CONNECTION_NAME_KEY: &str = "connection.name"; @@ -205,16 +204,6 @@ fn get_property_required( .map_err(Into::into) } -#[inline(always)] -fn is_kafka_connector(with_properties: &BTreeMap) -> bool { - const UPSTREAM_SOURCE_KEY: &str = "connector"; - with_properties - .get(UPSTREAM_SOURCE_KEY) - .unwrap_or(&"".to_string()) - .to_lowercase() - .eq_ignore_ascii_case(KAFKA_CONNECTOR) -} - pub fn insert_privatelink_broker_rewrite_map( with_options: &mut BTreeMap, svc: Option<&PrivateLinkService>, diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index a113c9026fd0..1ce981dd8682 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -12,7 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; + +use crate::source::iceberg::ICEBERG_CONNECTOR; +use crate::source::{ + GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY, +}; /// Marker trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually. /// @@ -55,3 +60,68 @@ impl WithOptions for std::time::Duration {} impl WithOptions for crate::sink::kafka::CompressionCodec {} impl WithOptions for nexmark::config::RateShape {} impl WithOptions for nexmark::event::EventType {} + +pub trait Get { + fn get(&self, key: &str) -> Option<&String>; +} + +impl Get for HashMap { + fn get(&self, key: &str) -> Option<&String> { + self.get(key) + } +} + +impl Get for BTreeMap { + fn get(&self, key: &str) -> Option<&String> { + self.get(key) + } +} + +/// Utility methods for `WITH` properties (`HashMap` and `BTreeMap`). +pub trait WithPropertiesExt: Get { + #[inline(always)] + fn get_connector(&self) -> Option { + self.get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase()) + } + + #[inline(always)] + fn is_kafka_connector(&self) -> bool { + let Some(connector) = self.get_connector() else { + return false; + }; + connector == KAFKA_CONNECTOR + } + + #[inline(always)] + fn is_cdc_connector(&self) -> bool { + let Some(connector) = self.get_connector() else { + return false; + }; + connector.contains("-cdc") + } + + #[inline(always)] + fn is_iceberg_connector(&self) -> bool { + let Some(connector) = self.get_connector() else { + return false; + }; + connector == ICEBERG_CONNECTOR + } + + fn connector_need_pk(&self) -> bool { + // Currently only iceberg connector doesn't need primary key + !self.is_iceberg_connector() + } + + fn is_new_fs_connector(&self) -> bool { + self.get(UPSTREAM_SOURCE_KEY) + .map(|s| { + s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR) + || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR) + || s.eq_ignore_ascii_case(GCS_CONNECTOR) + }) + .unwrap_or(false) + } +} + +impl WithPropertiesExt for T {} diff --git a/src/frontend/src/catalog/connection_catalog.rs b/src/frontend/src/catalog/connection_catalog.rs index 58595dfbdfd6..7239b72c0d17 100644 --- a/src/frontend/src/catalog/connection_catalog.rs +++ b/src/frontend/src/catalog/connection_catalog.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use anyhow::anyhow; use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map; -use risingwave_connector::source::KAFKA_CONNECTOR; +use risingwave_connector::WithPropertiesExt; use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider; use risingwave_pb::catalog::connection::Info; use risingwave_pb::catalog::{connection, PbConnection}; @@ -65,23 +65,13 @@ impl OwnedByUserCatalog for ConnectionCatalog { } } -#[inline(always)] -fn is_kafka_connector(with_properties: &BTreeMap) -> bool { - const UPSTREAM_SOURCE_KEY: &str = "connector"; - with_properties - .get(UPSTREAM_SOURCE_KEY) - .unwrap_or(&"".to_string()) - .to_lowercase() - .eq_ignore_ascii_case(KAFKA_CONNECTOR) -} - pub(crate) fn resolve_private_link_connection( connection: &Arc, properties: &mut BTreeMap, ) -> Result<()> { #[allow(irrefutable_let_patterns)] if let connection::Info::PrivateLinkService(svc) = &connection.info { - if !is_kafka_connector(properties) { + if properties.is_kafka_connector() { return Err(RwError::from(anyhow!( "Private link is only supported for Kafka connector" ))); diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index fc35552270a2..1718432b70aa 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -18,6 +18,7 @@ use itertools::Itertools; use pgwire::pg_response::StatementType; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::ColumnCatalog; +use risingwave_connector::WithPropertiesExt; use risingwave_pb::catalog::StreamSourceInfo; use risingwave_pb::plan_common::{EncodeType, FormatType}; use risingwave_sqlparser::ast::{ @@ -28,7 +29,6 @@ use risingwave_sqlparser::parser::Parser; use super::alter_table_column::schema_has_schema_registry; use super::create_source::{bind_columns_from_source, validate_compatibility}; -use super::util::is_cdc_connector; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::source_catalog::SourceCatalog; @@ -152,7 +152,7 @@ pub async fn refresh_sr_and_get_columns_diff( .collect(); validate_compatibility(connector_schema, &mut with_properties)?; - if is_cdc_connector(&with_properties) { + if with_properties.is_cdc_connector() { bail_not_implemented!("altering a cdc source is not supported"); } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index cba138268f06..620cd2e06687 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -52,6 +52,7 @@ use risingwave_connector::source::{ KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, }; +use risingwave_connector::WithPropertiesExt; use risingwave_pb::catalog::{ PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc, }; @@ -75,10 +76,7 @@ use crate::handler::create_table::{ bind_pk_on_relation, bind_sql_column_constraints, bind_sql_columns, bind_sql_pk_names, ensure_table_constraints_supported, ColumnIdGenerator, }; -use crate::handler::util::{ - connector_need_pk, get_connector, is_cdc_connector, is_iceberg_connector, is_kafka_connector, - SourceSchemaCompatExt, -}; +use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalSource, ToStream, ToStreamContext}; @@ -298,7 +296,7 @@ pub(crate) async fn bind_columns_from_source( const KEY_MESSAGE_NAME_KEY: &str = "key.message"; const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy"; - let is_kafka: bool = is_kafka_connector(with_properties); + let is_kafka: bool = with_properties.is_kafka_connector(); let format_encode_options = WithOptions::try_from(source_schema.row_options())?.into_inner(); let mut format_encode_options_to_consume = format_encode_options.clone(); @@ -447,7 +445,7 @@ pub(crate) async fn bind_columns_from_source( .await? } (Format::None, Encode::None) => { - if is_iceberg_connector(with_properties) { + if with_properties.is_iceberg_connector() { Some( extract_iceberg_columns(with_properties) .await @@ -533,7 +531,7 @@ pub fn handle_addition_columns( mut additional_columns: IncludeOption, columns: &mut Vec, ) -> Result<()> { - let connector_name = get_connector(with_properties).unwrap(); // there must be a connector in source + let connector_name = with_properties.get_connector().unwrap(); // there must be a connector in source if COMPATIBLE_ADDITIONAL_COLUMNS .get(connector_name.as_str()) @@ -878,7 +876,7 @@ fn check_and_add_timestamp_column( with_properties: &HashMap, columns: &mut Vec, ) { - if is_kafka_connector(with_properties) { + if with_properties.is_kafka_connector() { if columns.iter().any(|col| { matches!( col.column_desc.additional_column.column_type, @@ -1024,7 +1022,8 @@ pub fn validate_compatibility( source_schema: &ConnectorSchema, props: &mut HashMap, ) -> Result<()> { - let connector = get_connector(props) + let connector = props + .get_connector() .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?; let compatible_formats = CONNECTORS_COMPATIBLE_FORMATS @@ -1102,7 +1101,7 @@ pub(super) async fn check_source_schema( row_id_index: Option, columns: &[ColumnCatalog], ) -> Result<()> { - let Some(connector) = get_connector(props) else { + let Some(connector) = props.get_connector() else { return Ok(()); }; @@ -1306,7 +1305,7 @@ pub async fn handle_create_source( let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?; // gated the feature with a session variable - let create_cdc_source_job = if is_cdc_connector(&with_properties) { + let create_cdc_source_job = if with_properties.is_cdc_connector() { CdcTableType::from_properties(&with_properties).can_backfill() } else { false @@ -1362,7 +1361,7 @@ pub async fn handle_create_source( .into()); } let (mut columns, pk_column_ids, row_id_index) = - bind_pk_on_relation(columns, pk_names, connector_need_pk(&with_properties))?; + bind_pk_on_relation(columns, pk_names, with_properties.connector_need_pk())?; debug_assert!(is_column_ids_dedup(&columns)); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 493cfe967f3d..3eb6b5279293 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -29,11 +29,11 @@ use risingwave_common::catalog::{ }; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; -use risingwave_connector::source; use risingwave_connector::source::cdc::external::{ DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, }; use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY; +use risingwave_connector::{source, WithPropertiesExt}; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; @@ -61,7 +61,6 @@ use crate::handler::create_source::{ bind_all_columns, bind_columns_from_source, bind_source_pk, bind_source_watermark, check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY, }; -use crate::handler::util::is_iceberg_connector; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource}; @@ -514,7 +513,7 @@ pub(crate) async fn gen_create_table_plan_with_source( c.column_desc.column_id = col_id_gen.generate(c.name()) } - if is_iceberg_connector(&with_properties) { + if with_properties.is_iceberg_connector() { return Err( ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(), ); diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 6c9a9bb45f2a..d3ccb55e6a6a 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -31,12 +30,9 @@ use risingwave_common::catalog::Field; use risingwave_common::row::Row as _; use risingwave_common::types::{write_date_time_tz, DataType, ScalarRefImpl, Timestamptz}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; -use risingwave_connector::source::KAFKA_CONNECTOR; use risingwave_sqlparser::ast::{CompatibleSourceSchema, ConnectorSchema}; use crate::error::{ErrorCode, Result as RwResult}; -use crate::handler::create_source::UPSTREAM_SOURCE_KEY; use crate::session::{current, SessionImpl}; pin_project! { @@ -180,43 +176,6 @@ pub fn to_pg_field(f: &Field) -> PgFieldDescriptor { ) } -pub fn connector_need_pk(with_properties: &HashMap) -> bool { - // Currently only iceberg connector doesn't need primary key - !is_iceberg_connector(with_properties) -} - -#[inline(always)] -pub fn get_connector(with_properties: &HashMap) -> Option { - with_properties - .get(UPSTREAM_SOURCE_KEY) - .map(|s| s.to_lowercase()) -} - -#[inline(always)] -pub fn is_kafka_connector(with_properties: &HashMap) -> bool { - let Some(connector) = get_connector(with_properties) else { - return false; - }; - - connector == KAFKA_CONNECTOR -} - -#[inline(always)] -pub fn is_cdc_connector(with_properties: &HashMap) -> bool { - let Some(connector) = get_connector(with_properties) else { - return false; - }; - connector.contains("-cdc") -} - -#[inline(always)] -pub fn is_iceberg_connector(with_properties: &HashMap) -> bool { - let Some(connector) = get_connector(with_properties) else { - return false; - }; - connector == ICEBERG_CONNECTOR -} - #[easy_ext::ext(SourceSchemaCompatExt)] impl CompatibleSourceSchema { /// Convert `self` to [`ConnectorSchema`] and warn the user if the syntax is deprecated. diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 04e63a246a7a..406a3654def2 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -20,7 +20,7 @@ use educe::Educe; use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; -use risingwave_connector::source::ConnectorProperties; +use risingwave_connector::WithPropertiesExt; use super::super::utils::TableCatalogBuilder; use super::GenericPlanNode; @@ -99,9 +99,9 @@ impl GenericPlanNode for Source { impl Source { pub fn is_new_fs_connector(&self) -> bool { - self.catalog.as_ref().is_some_and(|catalog| { - ConnectorProperties::is_new_fs_connector_b_tree_map(&catalog.with_properties) - }) + self.catalog + .as_ref() + .is_some_and(|catalog| catalog.with_properties.is_new_fs_connector()) } /// The columns in stream/batch source node indicate the actual columns it will produce, diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 633bcf29354f..574cab7d6497 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -19,7 +19,7 @@ use std::num::NonZeroU32; use risingwave_connector::source::kafka::{ insert_privatelink_broker_rewrite_map, CONNECTION_NAME_KEY, PRIVATELINK_ENDPOINT_KEY, }; -use risingwave_connector::source::KAFKA_CONNECTOR; +use risingwave_connector::WithPropertiesExt; use risingwave_sqlparser::ast::{ CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, SqlOption, Statement, Value, @@ -28,7 +28,6 @@ use risingwave_sqlparser::ast::{ use crate::catalog::connection_catalog::resolve_private_link_connection; use crate::catalog::ConnectionId; use crate::error::{ErrorCode, Result as RwResult, RwError}; -use crate::handler::create_source::UPSTREAM_SOURCE_KEY; use crate::session::SessionImpl; mod options { @@ -113,24 +112,12 @@ impl WithOptions { } } -#[inline(always)] -fn is_kafka_connector(with_options: &WithOptions) -> bool { - let Some(connector) = with_options - .inner() - .get(UPSTREAM_SOURCE_KEY) - .map(|s| s.to_lowercase()) - else { - return false; - }; - connector == KAFKA_CONNECTOR -} - pub(crate) fn resolve_privatelink_in_with_option( with_options: &mut WithOptions, schema_name: &Option, session: &SessionImpl, ) -> RwResult> { - let is_kafka = is_kafka_connector(with_options); + let is_kafka = with_options.is_kafka_connector(); let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY); // if `privatelink.endpoint` is provided in WITH, use it to rewrite broker address directly diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 02fb68b1446c..53661d87b20c 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -19,8 +19,9 @@ use risingwave_common::catalog::{ }; use risingwave_connector::source::reader::desc::SourceDescBuilder; use risingwave_connector::source::{ - should_copy_to_format_encode_options, ConnectorProperties, SourceCtrlOpts, UPSTREAM_SOURCE_KEY, + should_copy_to_format_encode_options, SourceCtrlOpts, UPSTREAM_SOURCE_KEY, }; +use risingwave_connector::WithPropertiesExt; use risingwave_pb::catalog::PbStreamSourceInfo; use risingwave_pb::data::data_type::TypeName as PbTypeName; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; @@ -208,8 +209,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { .map(|c| c.to_ascii_lowercase()) .unwrap_or_default(); let is_fs_connector = FS_CONNECTORS.contains(&connector.as_str()); - let is_fs_v2_connector = - ConnectorProperties::is_new_fs_connector_hash_map(&source.with_properties); + let is_fs_v2_connector = source.with_properties.is_new_fs_connector(); if is_fs_connector { #[expect(deprecated)] From 287d40f9895963a2190acdcec369db31771c25ec Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 12 Mar 2024 16:07:23 +0800 Subject: [PATCH 2/2] fix --- src/frontend/src/catalog/connection_catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/catalog/connection_catalog.rs b/src/frontend/src/catalog/connection_catalog.rs index 7239b72c0d17..54e1210979fe 100644 --- a/src/frontend/src/catalog/connection_catalog.rs +++ b/src/frontend/src/catalog/connection_catalog.rs @@ -71,7 +71,7 @@ pub(crate) fn resolve_private_link_connection( ) -> Result<()> { #[allow(irrefutable_let_patterns)] if let connection::Info::PrivateLinkService(svc) = &connection.info { - if properties.is_kafka_connector() { + if !properties.is_kafka_connector() { return Err(RwError::from(anyhow!( "Private link is only supported for Kafka connector" )));