diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index bb531a79a3f9..3cb1ec678977 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -40,7 +40,7 @@ use common_telemetry::info; use common_telemetry::logging::{LoggingOptions, TracingOptions}; use common_time::timezone::set_default_timezone; use common_version::{short_version, version}; -use common_wal::config::StandaloneWalConfig; +use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig}; use datanode::datanode::{Datanode, DatanodeBuilder}; use file_engine::config::EngineConfig as FileEngineConfig; @@ -130,7 +130,7 @@ pub struct StandaloneOptions { pub opentsdb: OpentsdbOptions, pub influxdb: InfluxdbOptions, pub prom_store: PromStoreOptions, - pub wal: StandaloneWalConfig, + pub wal: DatanodeWalConfig, pub storage: StorageConfig, pub metadata_store: KvBackendConfig, pub procedure: ProcedureConfig, @@ -155,7 +155,7 @@ impl Default for StandaloneOptions { opentsdb: OpentsdbOptions::default(), influxdb: InfluxdbOptions::default(), prom_store: PromStoreOptions::default(), - wal: StandaloneWalConfig::default(), + wal: DatanodeWalConfig::default(), storage: StorageConfig::default(), metadata_store: KvBackendConfig::default(), procedure: ProcedureConfig::default(), @@ -204,7 +204,7 @@ impl StandaloneOptions { DatanodeOptions { node_id: Some(0), enable_telemetry: cloned_opts.enable_telemetry, - wal: cloned_opts.wal.into(), + wal: cloned_opts.wal, storage: cloned_opts.storage, region_engine: cloned_opts.region_engine, grpc: cloned_opts.grpc, diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 3f25c3fdc7f5..bf884dbd481c 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -24,7 +24,7 @@ use common_grpc::channel_manager::{ use common_runtime::global::RuntimeOptions; use common_telemetry::logging::LoggingOptions; use common_wal::config::raft_engine::RaftEngineConfig; -use common_wal::config::{DatanodeWalConfig, StandaloneWalConfig}; +use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; use frontend::frontend::FrontendOptions; use frontend::service_config::datanode::DatanodeClientOptions; @@ -202,7 +202,7 @@ fn test_load_standalone_example_config() { }, component: StandaloneOptions { default_timezone: Some("UTC".to_string()), - wal: StandaloneWalConfig::RaftEngine(RaftEngineConfig { + wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig { dir: Some("/tmp/greptimedb/wal".to_string()), sync_period: Some(Duration::from_secs(10)), ..Default::default() diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index 09b03c5b7dca..5fb3db6e20eb 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -123,6 +123,7 @@ pub fn prepare_wal_options( #[cfg(test)] mod tests { + use common_wal::config::kafka::common::KafkaTopicConfig; use common_wal::config::kafka::MetasrvKafkaConfig; use common_wal::test_util::run_test_with_kafka_wal; @@ -160,9 +161,13 @@ mod tests { .collect::>(); // Creates a topic manager. - let config = MetasrvKafkaConfig { + let kafka_topic = KafkaTopicConfig { replication_factor: broker_endpoints.len() as i16, + ..Default::default() + }; + let config = MetasrvKafkaConfig { broker_endpoints, + kafka_topic, ..Default::default() }; let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index fb0130d0dfc7..ec88e37cd14d 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -56,11 +56,11 @@ impl TopicManager { /// Creates a new topic manager. pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self { // Topics should be created. - let topics = (0..config.num_topics) - .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) + let topics = (0..config.kafka_topic.num_topics) + .map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.topic_name_prefix)) .collect::>(); - let selector = match config.selector_type { + let selector = match config.kafka_topic.selector_type { TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), }; @@ -76,7 +76,7 @@ impl TopicManager { /// The initializer first tries to restore persisted topics from the kv backend. /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics. pub async fn start(&self) -> Result<()> { - let num_topics = self.config.num_topics; + let num_topics = self.config.kafka_topic.num_topics; ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); // Topics should be created. @@ -185,9 +185,9 @@ impl TopicManager { match client .create_topic( topic.clone(), - self.config.num_partitions, - self.config.replication_factor, - self.config.create_topic_timeout.as_millis() as i32, + self.config.kafka_topic.num_partitions, + self.config.kafka_topic.replication_factor, + self.config.kafka_topic.create_topic_timeout.as_millis() as i32, ) .await { @@ -242,6 +242,7 @@ impl TopicManager { #[cfg(test)] mod tests { + use common_wal::config::kafka::common::KafkaTopicConfig; use common_wal::test_util::run_test_with_kafka_wal; use super::*; @@ -283,9 +284,13 @@ mod tests { .collect::>(); // Creates a topic manager. - let config = MetasrvKafkaConfig { + let kafka_topic = KafkaTopicConfig { replication_factor: broker_endpoints.len() as i16, + ..Default::default() + }; + let config = MetasrvKafkaConfig { broker_endpoints, + kafka_topic, ..Default::default() }; let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 0b47c32ee21d..6edee1703c81 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -17,7 +17,7 @@ pub mod raft_engine; use serde::{Deserialize, Serialize}; -use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig}; +use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use crate::config::raft_engine::RaftEngineConfig; /// Wal configurations for metasrv. @@ -43,80 +43,43 @@ impl Default for DatanodeWalConfig { } } -/// Wal configurations for standalone. -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] -#[serde(tag = "provider", rename_all = "snake_case")] -pub enum StandaloneWalConfig { - RaftEngine(RaftEngineConfig), - Kafka(StandaloneKafkaConfig), -} - -impl Default for StandaloneWalConfig { - fn default() -> Self { - Self::RaftEngine(RaftEngineConfig::default()) - } -} - -impl From for MetasrvWalConfig { - fn from(config: StandaloneWalConfig) -> Self { +impl From for MetasrvWalConfig { + fn from(config: DatanodeWalConfig) -> Self { match config { - StandaloneWalConfig::RaftEngine(_) => Self::RaftEngine, - StandaloneWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig { + DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine, + DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig { broker_endpoints: config.broker_endpoints, - num_topics: config.num_topics, - selector_type: config.selector_type, - topic_name_prefix: config.topic_name_prefix, - num_partitions: config.num_partitions, - replication_factor: config.replication_factor, - create_topic_timeout: config.create_topic_timeout, backoff: config.backoff, + kafka_topic: config.kafka_topic, }), } } } -impl From for StandaloneWalConfig { +impl From for DatanodeWalConfig { fn from(config: MetasrvWalConfig) -> Self { match config { MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()), - MetasrvWalConfig::Kafka(config) => Self::Kafka(StandaloneKafkaConfig { + MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig { broker_endpoints: config.broker_endpoints, - num_topics: config.num_topics, - selector_type: config.selector_type, - topic_name_prefix: config.topic_name_prefix, - num_partitions: config.num_partitions, - replication_factor: config.replication_factor, - create_topic_timeout: config.create_topic_timeout, backoff: config.backoff, + kafka_topic: config.kafka_topic, ..Default::default() }), } } } -impl From for DatanodeWalConfig { - fn from(config: StandaloneWalConfig) -> Self { - match config { - StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config), - StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig { - broker_endpoints: config.broker_endpoints, - max_batch_bytes: config.max_batch_bytes, - consumer_wait_timeout: config.consumer_wait_timeout, - backoff: config.backoff, - }), - } - } -} - #[cfg(test)] mod tests { use std::time::Duration; use common_base::readable_size::ReadableSize; + use tests::kafka::common::KafkaTopicConfig; use super::*; use crate::config::kafka::common::BackoffConfig; - use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig}; + use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use crate::TopicSelectorType; #[test] @@ -168,11 +131,6 @@ mod tests { let toml_str = r#" provider = "kafka" broker_endpoints = ["127.0.0.1:9092"] - num_topics = 32 - selector_type = "round_robin" - topic_name_prefix = "greptimedb_wal_topic" - replication_factor = 1 - create_topic_timeout = "30s" max_batch_bytes = "1MB" linger = "200ms" consumer_wait_timeout = "100ms" @@ -180,24 +138,32 @@ mod tests { backoff_max = "10s" backoff_base = 2 backoff_deadline = "5mins" + num_topics = 32 + num_partitions = 1 + selector_type = "round_robin" + replication_factor = 1 + create_topic_timeout = "30s" + topic_name_prefix = "greptimedb_wal_topic" "#; // Deserialized to MetasrvWalConfig. let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap(); let expected = MetasrvKafkaConfig { broker_endpoints: vec!["127.0.0.1:9092".to_string()], - num_topics: 32, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), - num_partitions: 1, - replication_factor: 1, - create_topic_timeout: Duration::from_secs(30), backoff: BackoffConfig { init: Duration::from_millis(500), max: Duration::from_secs(10), base: 2, deadline: Some(Duration::from_secs(60 * 5)), }, + kafka_topic: KafkaTopicConfig { + num_topics: 32, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + }, }; assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected)); @@ -213,28 +179,15 @@ mod tests { base: 2, deadline: Some(Duration::from_secs(60 * 5)), }, - }; - assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); - - // Deserialized to StandaloneWalConfig. - let standalone_wal_config: StandaloneWalConfig = toml::from_str(toml_str).unwrap(); - let expected = StandaloneKafkaConfig { - broker_endpoints: vec!["127.0.0.1:9092".to_string()], - num_topics: 32, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), - num_partitions: 1, - replication_factor: 1, - create_topic_timeout: Duration::from_secs(30), - max_batch_bytes: ReadableSize::mb(1), - consumer_wait_timeout: Duration::from_millis(100), - backoff: BackoffConfig { - init: Duration::from_millis(500), - max: Duration::from_secs(10), - base: 2, - deadline: Some(Duration::from_secs(60 * 5)), + kafka_topic: KafkaTopicConfig { + num_topics: 32, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), }, }; - assert_eq!(standalone_wal_config, StandaloneWalConfig::Kafka(expected)); + assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); } } diff --git a/src/common/wal/src/config/kafka.rs b/src/common/wal/src/config/kafka.rs index f47e444521f2..27265d00987e 100644 --- a/src/common/wal/src/config/kafka.rs +++ b/src/common/wal/src/config/kafka.rs @@ -15,8 +15,6 @@ pub mod common; pub mod datanode; pub mod metasrv; -pub mod standalone; pub use datanode::DatanodeKafkaConfig; pub use metasrv::MetasrvKafkaConfig; -pub use standalone::StandaloneKafkaConfig; diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index ea708d96159c..e61823938546 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -17,6 +17,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use serde_with::with_prefix; +use crate::{TopicSelectorType, TOPIC_NAME_PREFIX}; + with_prefix!(pub backoff_prefix "backoff_"); /// Backoff configurations for kafka clients. @@ -46,3 +48,35 @@ impl Default for BackoffConfig { } } } + +/// Topic configurations for kafka clients. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct KafkaTopicConfig { + /// Number of topics to be created upon start. + pub num_topics: usize, + /// Number of partitions per topic. + pub num_partitions: i32, + /// The type of the topic selector with which to select a topic for a region. + pub selector_type: TopicSelectorType, + /// The replication factor of each topic. + pub replication_factor: i16, + /// The timeout of topic creation. + #[serde(with = "humantime_serde")] + pub create_topic_timeout: Duration, + /// Topic name prefix. + pub topic_name_prefix: String, +} + +impl Default for KafkaTopicConfig { + fn default() -> Self { + Self { + num_topics: 64, + num_partitions: 1, + selector_type: TopicSelectorType::RoundRobin, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), + } + } +} diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index ae97c1017cf5..b01e0635f637 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -17,7 +17,7 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; +use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; use crate::BROKER_ENDPOINT; /// Kafka wal configurations for datanode. @@ -36,6 +36,9 @@ pub struct DatanodeKafkaConfig { /// The backoff config. #[serde(flatten, with = "backoff_prefix")] pub backoff: BackoffConfig, + /// The kafka topic config. + #[serde(flatten)] + pub kafka_topic: KafkaTopicConfig, } impl Default for DatanodeKafkaConfig { @@ -46,6 +49,7 @@ impl Default for DatanodeKafkaConfig { max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig::default(), + kafka_topic: KafkaTopicConfig::default(), } } } diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index 99efe762fbc0..519992e17579 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; - use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; -use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; +use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; +use crate::BROKER_ENDPOINT; /// Kafka wal configurations for metasrv. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -25,37 +23,21 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; pub struct MetasrvKafkaConfig { /// The broker endpoints of the Kafka cluster. pub broker_endpoints: Vec, - /// The number of topics to be created upon start. - pub num_topics: usize, - /// The type of the topic selector with which to select a topic for a region. - pub selector_type: TopicSelectorType, - /// Topic name prefix. - pub topic_name_prefix: String, - /// The number of partitions per topic. - pub num_partitions: i32, - /// The replication factor of each topic. - pub replication_factor: i16, - /// The timeout of topic creation. - #[serde(with = "humantime_serde")] - pub create_topic_timeout: Duration, /// The backoff config. #[serde(flatten, with = "backoff_prefix")] pub backoff: BackoffConfig, + /// The kafka config. + #[serde(flatten)] + pub kafka_topic: KafkaTopicConfig, } impl Default for MetasrvKafkaConfig { fn default() -> Self { let broker_endpoints = vec![BROKER_ENDPOINT.to_string()]; - let replication_factor = broker_endpoints.len() as i16; Self { broker_endpoints, - num_topics: 64, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), - num_partitions: 1, - replication_factor, - create_topic_timeout: Duration::from_secs(30), backoff: BackoffConfig::default(), + kafka_topic: KafkaTopicConfig::default(), } } } diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 7bb29ce3318e..491a93086953 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -21,6 +21,7 @@ use common_query::Output; use common_recordbatch::util; use common_telemetry::warn; use common_test_util::find_workspace_path; +use common_wal::config::kafka::common::KafkaTopicConfig; use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use frontend::instance::Instance; @@ -231,8 +232,11 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option Option