From e8641ea4bcf980b2009cda1eec24e0fa6f888e20 Mon Sep 17 00:00:00 2001 From: irenjj Date: Tue, 2 Jul 2024 20:54:52 +0800 Subject: [PATCH 01/18] refactor: Remove the StandaloneKafkaConfig struct --- src/cmd/src/standalone.rs | 8 +- src/cmd/tests/load_config_test.rs | 4 +- src/common/wal/src/config.rs | 55 +++---------- src/common/wal/src/config/kafka.rs | 2 - src/common/wal/src/config/kafka/datanode.rs | 23 +++++- src/common/wal/src/config/kafka/standalone.rs | 79 ------------------- 6 files changed, 41 insertions(+), 130 deletions(-) delete mode 100644 src/common/wal/src/config/kafka/standalone.rs diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 9216dc35ea9d..fdd2f7868210 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -39,7 +39,7 @@ use common_telemetry::info; use common_telemetry::logging::{LoggingOptions, TracingOptions}; use common_time::timezone::set_default_timezone; use common_version::{short_version, version}; -use common_wal::config::StandaloneWalConfig; +use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig}; use datanode::datanode::{Datanode, DatanodeBuilder}; use file_engine::config::EngineConfig as FileEngineConfig; @@ -126,7 +126,7 @@ pub struct StandaloneOptions { pub opentsdb: OpentsdbOptions, pub influxdb: InfluxdbOptions, pub prom_store: PromStoreOptions, - pub wal: StandaloneWalConfig, + pub wal: DatanodeWalConfig, pub storage: StorageConfig, pub metadata_store: KvBackendConfig, pub procedure: ProcedureConfig, @@ -151,7 +151,7 @@ impl Default for StandaloneOptions { opentsdb: OpentsdbOptions::default(), influxdb: InfluxdbOptions::default(), prom_store: PromStoreOptions::default(), - wal: StandaloneWalConfig::default(), + wal: DatanodeWalConfig::default(), storage: StorageConfig::default(), metadata_store: KvBackendConfig::default(), procedure: ProcedureConfig::default(), @@ -200,7 +200,7 @@ impl StandaloneOptions { DatanodeOptions { node_id: Some(0), enable_telemetry: cloned_opts.enable_telemetry, - wal: cloned_opts.wal.into(), + wal: cloned_opts.wal, storage: cloned_opts.storage, region_engine: cloned_opts.region_engine, rpc_addr: cloned_opts.grpc.addr, diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 80075b846e51..699ca826af25 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -20,7 +20,7 @@ use common_config::Configurable; use common_runtime::global::RuntimeOptions; use common_telemetry::logging::LoggingOptions; use common_wal::config::raft_engine::RaftEngineConfig; -use common_wal::config::{DatanodeWalConfig, StandaloneWalConfig}; +use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; use frontend::frontend::FrontendOptions; use frontend::service_config::datanode::DatanodeClientOptions; @@ -185,7 +185,7 @@ fn test_load_standalone_example_config() { }, component: StandaloneOptions { default_timezone: Some("UTC".to_string()), - wal: StandaloneWalConfig::RaftEngine(RaftEngineConfig { + wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig { dir: Some("/tmp/greptimedb/wal".to_string()), sync_period: Some(Duration::from_secs(10)), ..Default::default() diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 072763782750..c2ec06e1003c 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -17,7 +17,7 @@ pub mod raft_engine; use serde::{Deserialize, Serialize}; -use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig}; +use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use crate::config::raft_engine::RaftEngineConfig; /// Wal configurations for metasrv. @@ -43,25 +43,11 @@ impl Default for DatanodeWalConfig { } } -/// Wal configurations for standalone. -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] -#[serde(tag = "provider", rename_all = "snake_case")] -pub enum StandaloneWalConfig { - RaftEngine(RaftEngineConfig), - Kafka(StandaloneKafkaConfig), -} - -impl Default for StandaloneWalConfig { - fn default() -> Self { - Self::RaftEngine(RaftEngineConfig::default()) - } -} - -impl From for MetasrvWalConfig { - fn from(config: StandaloneWalConfig) -> Self { +impl From for MetasrvWalConfig { + fn from(config: DatanodeWalConfig) -> Self { match config { - StandaloneWalConfig::RaftEngine(_) => Self::RaftEngine, - StandaloneWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig { + DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine, + DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig { broker_endpoints: config.broker_endpoints, num_topics: config.num_topics, selector_type: config.selector_type, @@ -75,11 +61,11 @@ impl From for MetasrvWalConfig { } } -impl From for StandaloneWalConfig { +impl From for DatanodeWalConfig { fn from(config: MetasrvWalConfig) -> Self { match config { MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()), - MetasrvWalConfig::Kafka(config) => Self::Kafka(StandaloneKafkaConfig { + MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig { broker_endpoints: config.broker_endpoints, num_topics: config.num_topics, selector_type: config.selector_type, @@ -94,22 +80,6 @@ impl From for StandaloneWalConfig { } } -impl From for DatanodeWalConfig { - fn from(config: StandaloneWalConfig) -> Self { - match config { - StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config), - StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig { - broker_endpoints: config.broker_endpoints, - compression: config.compression, - max_batch_size: config.max_batch_size, - linger: config.linger, - consumer_wait_timeout: config.consumer_wait_timeout, - backoff: config.backoff, - }), - } - } -} - #[cfg(test)] mod tests { use std::time::Duration; @@ -119,7 +89,7 @@ mod tests { use super::*; use crate::config::kafka::common::BackoffConfig; - use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig}; + use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use crate::TopicSelectorType; #[test] @@ -218,12 +188,13 @@ mod tests { base: 2, deadline: Some(Duration::from_secs(60 * 5)), }, + ..Default::default() }; assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); - // Deserialized to StandaloneWalConfig. - let standalone_wal_config: StandaloneWalConfig = toml::from_str(toml_str).unwrap(); - let expected = StandaloneKafkaConfig { + // Deserialized to DatanodeWalConfig. + let standalone_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); + let expected = DatanodeKafkaConfig { broker_endpoints: vec!["127.0.0.1:9092".to_string()], num_topics: 32, selector_type: TopicSelectorType::RoundRobin, @@ -242,6 +213,6 @@ mod tests { deadline: Some(Duration::from_secs(60 * 5)), }, }; - assert_eq!(standalone_wal_config, StandaloneWalConfig::Kafka(expected)); + assert_eq!(standalone_wal_config, DatanodeWalConfig::Kafka(expected)); } } diff --git a/src/common/wal/src/config/kafka.rs b/src/common/wal/src/config/kafka.rs index f47e444521f2..27265d00987e 100644 --- a/src/common/wal/src/config/kafka.rs +++ b/src/common/wal/src/config/kafka.rs @@ -15,8 +15,6 @@ pub mod common; pub mod datanode; pub mod metasrv; -pub mod standalone; pub use datanode::DatanodeKafkaConfig; pub use metasrv::MetasrvKafkaConfig; -pub use standalone::StandaloneKafkaConfig; diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index b15d13dffc2a..4e42aeba1f58 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -19,7 +19,7 @@ use rskafka::client::partition::Compression; use serde::{Deserialize, Serialize}; use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; -use crate::BROKER_ENDPOINT; +use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; /// Kafka wal configurations for datanode. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -41,10 +41,25 @@ pub struct DatanodeKafkaConfig { /// The backoff config. #[serde(flatten, with = "backoff_prefix")] pub backoff: BackoffConfig, + /// Number of topics to be created upon start. + pub num_topics: usize, + /// Number of partitions per topic. + pub num_partitions: i32, + /// The type of the topic selector with which to select a topic for a region. + pub selector_type: TopicSelectorType, + /// The replication factor of each topic. + pub replication_factor: i16, + /// The timeout of topic creation. + #[serde(with = "humantime_serde")] + pub create_topic_timeout: Duration, + /// Topic name prefix. + pub topic_name_prefix: String, } impl Default for DatanodeKafkaConfig { fn default() -> Self { + let broker_endpoints = [BROKER_ENDPOINT.to_string()]; + let replication_factor = broker_endpoints.len() as i16; Self { broker_endpoints: vec![BROKER_ENDPOINT.to_string()], compression: Compression::NoCompression, @@ -53,6 +68,12 @@ impl Default for DatanodeKafkaConfig { linger: Duration::from_millis(200), consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig::default(), + num_topics: 64, + num_partitions: 1, + selector_type: TopicSelectorType::RoundRobin, + replication_factor, + create_topic_timeout: Duration::from_secs(30), + topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), } } } diff --git a/src/common/wal/src/config/kafka/standalone.rs b/src/common/wal/src/config/kafka/standalone.rs deleted file mode 100644 index 3da8fa498092..000000000000 --- a/src/common/wal/src/config/kafka/standalone.rs +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::time::Duration; - -use common_base::readable_size::ReadableSize; -use rskafka::client::partition::Compression; -use serde::{Deserialize, Serialize}; - -use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; -use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; - -/// Kafka wal configurations for standalone. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(default)] -pub struct StandaloneKafkaConfig { - /// The broker endpoints of the Kafka cluster. - pub broker_endpoints: Vec, - /// Number of topics to be created upon start. - pub num_topics: usize, - /// The type of the topic selector with which to select a topic for a region. - pub selector_type: TopicSelectorType, - /// Topic name prefix. - pub topic_name_prefix: String, - /// Number of partitions per topic. - pub num_partitions: i32, - /// The replication factor of each topic. - pub replication_factor: i16, - /// The timeout of topic creation. - #[serde(with = "humantime_serde")] - pub create_topic_timeout: Duration, - /// The compression algorithm used to compress kafka records. - #[serde(skip)] - pub compression: Compression, - /// The max size of a single producer batch. - pub max_batch_size: ReadableSize, - /// The linger duration of a kafka batch producer. - #[serde(with = "humantime_serde")] - pub linger: Duration, - /// The consumer wait timeout. - #[serde(with = "humantime_serde")] - pub consumer_wait_timeout: Duration, - /// The backoff config. - #[serde(flatten, with = "backoff_prefix")] - pub backoff: BackoffConfig, -} - -impl Default for StandaloneKafkaConfig { - fn default() -> Self { - let broker_endpoints = vec![BROKER_ENDPOINT.to_string()]; - let replication_factor = broker_endpoints.len() as i16; - Self { - broker_endpoints, - num_topics: 64, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), - num_partitions: 1, - replication_factor, - create_topic_timeout: Duration::from_secs(30), - compression: Compression::NoCompression, - // Warning: Kafka has a default limit of 1MB per message in a topic. - max_batch_size: ReadableSize::mb(1), - linger: Duration::from_millis(200), - consumer_wait_timeout: Duration::from_millis(100), - backoff: BackoffConfig::default(), - } - } -} From 3bce938b3c3d30b168cf0d77a29f2e6458fb0852 Mon Sep 17 00:00:00 2001 From: irenjj Date: Tue, 2 Jul 2024 21:26:10 +0800 Subject: [PATCH 02/18] remove the redundant assignment --- src/common/wal/src/config.rs | 18 ------------------ src/common/wal/src/config/kafka/datanode.rs | 3 +-- 2 files changed, 1 insertion(+), 20 deletions(-) diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index c2ec06e1003c..6c3ac0ede25c 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -174,24 +174,6 @@ mod tests { }; assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected)); - // Deserialized to DatanodeWalConfig. - let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); - let expected = DatanodeKafkaConfig { - broker_endpoints: vec!["127.0.0.1:9092".to_string()], - compression: Compression::default(), - max_batch_size: ReadableSize::mb(1), - linger: Duration::from_millis(200), - consumer_wait_timeout: Duration::from_millis(100), - backoff: BackoffConfig { - init: Duration::from_millis(500), - max: Duration::from_secs(10), - base: 2, - deadline: Some(Duration::from_secs(60 * 5)), - }, - ..Default::default() - }; - assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); - // Deserialized to DatanodeWalConfig. let standalone_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); let expected = DatanodeKafkaConfig { diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index 4e42aeba1f58..023e8426134b 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -58,8 +58,7 @@ pub struct DatanodeKafkaConfig { impl Default for DatanodeKafkaConfig { fn default() -> Self { - let broker_endpoints = [BROKER_ENDPOINT.to_string()]; - let replication_factor = broker_endpoints.len() as i16; + let replication_factor = 1; Self { broker_endpoints: vec![BROKER_ENDPOINT.to_string()], compression: Compression::NoCompression, From f937b14732589a4249dafe2707ba23593f4ada81 Mon Sep 17 00:00:00 2001 From: irenjj Date: Tue, 2 Jul 2024 22:14:33 +0800 Subject: [PATCH 03/18] remove rudundant struct --- src/common/wal/src/config/kafka/standalone.rs | 77 ------------------- 1 file changed, 77 deletions(-) delete mode 100644 src/common/wal/src/config/kafka/standalone.rs diff --git a/src/common/wal/src/config/kafka/standalone.rs b/src/common/wal/src/config/kafka/standalone.rs deleted file mode 100644 index ddee160bf642..000000000000 --- a/src/common/wal/src/config/kafka/standalone.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::time::Duration; - -use common_base::readable_size::ReadableSize; -use rskafka::client::partition::Compression; -use serde::{Deserialize, Serialize}; - -use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; -use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; - -/// Kafka wal configurations for standalone. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(default)] -pub struct StandaloneKafkaConfig { - /// The broker endpoints of the Kafka cluster. - pub broker_endpoints: Vec, - /// Number of topics to be created upon start. - pub num_topics: usize, - /// The type of the topic selector with which to select a topic for a region. - pub selector_type: TopicSelectorType, - /// Topic name prefix. - pub topic_name_prefix: String, - /// Number of partitions per topic. - pub num_partitions: i32, - /// The replication factor of each topic. - pub replication_factor: i16, - /// The timeout of topic creation. - #[serde(with = "humantime_serde")] - pub create_topic_timeout: Duration, - /// The compression algorithm used to compress kafka records. - #[serde(skip)] - pub compression: Compression, - /// TODO(weny): Remove the alias once we release v0.9. - /// The max size of a single producer batch. - #[serde(alias = "max_batch_size")] - pub max_batch_bytes: ReadableSize, - /// The consumer wait timeout. - #[serde(with = "humantime_serde")] - pub consumer_wait_timeout: Duration, - /// The backoff config. - #[serde(flatten, with = "backoff_prefix")] - pub backoff: BackoffConfig, -} - -impl Default for StandaloneKafkaConfig { - fn default() -> Self { - let broker_endpoints = vec![BROKER_ENDPOINT.to_string()]; - let replication_factor = broker_endpoints.len() as i16; - Self { - broker_endpoints, - num_topics: 64, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), - num_partitions: 1, - replication_factor, - create_topic_timeout: Duration::from_secs(30), - compression: Compression::NoCompression, - // Warning: Kafka has a default limit of 1MB per message in a topic. - max_batch_bytes: ReadableSize::mb(1), - consumer_wait_timeout: Duration::from_millis(100), - backoff: BackoffConfig::default(), - } - } -} From 75dbad4827d02eb820b96f0d61de1c38fe9b4a1b Mon Sep 17 00:00:00 2001 From: irenjj Date: Tue, 2 Jul 2024 22:42:17 +0800 Subject: [PATCH 04/18] simplify replication_factor --- src/common/wal/src/config/kafka/datanode.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index 43c01bda5e9a..f49980294957 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -57,7 +57,6 @@ pub struct DatanodeKafkaConfig { impl Default for DatanodeKafkaConfig { fn default() -> Self { - let replication_factor = 1; Self { broker_endpoints: vec![BROKER_ENDPOINT.to_string()], compression: Compression::NoCompression, @@ -68,7 +67,7 @@ impl Default for DatanodeKafkaConfig { num_topics: 64, num_partitions: 1, selector_type: TopicSelectorType::RoundRobin, - replication_factor, + replication_factor: 1, create_topic_timeout: Duration::from_secs(30), topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), } From 8e62a4f6ec75e27af19414840879b0b3c141c8d5 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 3 Jul 2024 20:17:53 +0800 Subject: [PATCH 05/18] add KafkaTopicConfig --- .../kafka/topic_manager.rs | 21 +++++++----- src/common/wal/src/config.rs | 14 ++------ src/common/wal/src/config/kafka/common.rs | 33 +++++++++++++++++++ src/common/wal/src/config/kafka/datanode.rs | 27 ++++----------- src/common/wal/src/config/kafka/metasrv.rs | 23 ++++--------- 5 files changed, 61 insertions(+), 57 deletions(-) diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index ab64a6fa0fb0..9747fc8b80d3 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -56,11 +56,11 @@ impl TopicManager { /// Creates a new topic manager. pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self { // Topics should be created. - let topics = (0..config.num_topics) - .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) + let topics = (0..config.kafka_topic.num_topics) + .map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.topic_name_prefix)) .collect::>(); - let selector = match config.selector_type { + let selector = match config.kafka_topic.selector_type { TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), }; @@ -76,7 +76,7 @@ impl TopicManager { /// The initializer first tries to restore persisted topics from the kv backend. /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics. pub async fn start(&self) -> Result<()> { - let num_topics = self.config.num_topics; + let num_topics = self.config.kafka_topic.num_topics; ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); // Topics should be created. @@ -185,9 +185,9 @@ impl TopicManager { match client .create_topic( topic.clone(), - self.config.num_partitions, - self.config.replication_factor, - self.config.create_topic_timeout.as_millis() as i32, + self.config.kafka_topic.num_partitions, + self.config.kafka_topic.replication_factor, + self.config.kafka_topic.create_topic_timeout.as_millis() as i32, ) .await { @@ -242,6 +242,7 @@ impl TopicManager { #[cfg(test)] mod tests { + use common_wal::config::kafka::common::KafkaTopicConfig; use common_wal::test_util::run_test_with_kafka_wal; use super::*; @@ -283,9 +284,13 @@ mod tests { .collect::>(); // Creates a topic manager. - let config = MetasrvKafkaConfig { + let kafka_topic = KafkaTopicConfig { replication_factor: broker_endpoints.len() as i16, + ..Default::default() + }; + let config = MetasrvKafkaConfig { broker_endpoints, + kafka_topic, ..Default::default() }; let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 5a112d7b66c1..18b5000576b0 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -49,13 +49,8 @@ impl From for MetasrvWalConfig { DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine, DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig { broker_endpoints: config.broker_endpoints, - num_topics: config.num_topics, - selector_type: config.selector_type, - topic_name_prefix: config.topic_name_prefix, - num_partitions: config.num_partitions, - replication_factor: config.replication_factor, - create_topic_timeout: config.create_topic_timeout, backoff: config.backoff, + kafka_topic: config.kafka_topic, }), } } @@ -67,13 +62,8 @@ impl From for DatanodeWalConfig { MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()), MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig { broker_endpoints: config.broker_endpoints, - num_topics: config.num_topics, - selector_type: config.selector_type, - topic_name_prefix: config.topic_name_prefix, - num_partitions: config.num_partitions, - replication_factor: config.replication_factor, - create_topic_timeout: config.create_topic_timeout, backoff: config.backoff, + kafka_topic: config.kafka_topic, ..Default::default() }), } diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index ea708d96159c..3bdf92ffc77a 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -17,6 +17,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use serde_with::with_prefix; +use crate::{TopicSelectorType, TOPIC_NAME_PREFIX}; + with_prefix!(pub backoff_prefix "backoff_"); /// Backoff configurations for kafka clients. @@ -46,3 +48,34 @@ impl Default for BackoffConfig { } } } + +/// Topic configurations for kafka clients. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct KafkaTopicConfig { + /// Number of topics to be created upon start. + pub num_topics: usize, + /// Number of partitions per topic. + pub num_partitions: i32, + /// The type of the topic selector with which to select a topic for a region. + pub selector_type: TopicSelectorType, + /// The replication factor of each topic. + pub replication_factor: i16, + /// The timeout of topic creation. + #[serde(with = "humantime_serde")] + pub create_topic_timeout: Duration, + /// Topic name prefix. + pub topic_name_prefix: String, +} + +impl Default for KafkaTopicConfig { + fn default() -> Self { + Self { + num_topics: 64, + num_partitions: 1, + selector_type: TopicSelectorType::RoundRobin, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), + } + } +} diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index f49980294957..63d68fd3a5a3 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -18,8 +18,8 @@ use common_base::readable_size::ReadableSize; use rskafka::client::partition::Compression; use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; -use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; +use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; +use crate::BROKER_ENDPOINT; /// Kafka wal configurations for datanode. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -40,19 +40,9 @@ pub struct DatanodeKafkaConfig { /// The backoff config. #[serde(flatten, with = "backoff_prefix")] pub backoff: BackoffConfig, - /// Number of topics to be created upon start. - pub num_topics: usize, - /// Number of partitions per topic. - pub num_partitions: i32, - /// The type of the topic selector with which to select a topic for a region. - pub selector_type: TopicSelectorType, - /// The replication factor of each topic. - pub replication_factor: i16, - /// The timeout of topic creation. - #[serde(with = "humantime_serde")] - pub create_topic_timeout: Duration, - /// Topic name prefix. - pub topic_name_prefix: String, + /// The kafka topic config. + #[serde(flatten)] + pub kafka_topic: KafkaTopicConfig, } impl Default for DatanodeKafkaConfig { @@ -64,12 +54,7 @@ impl Default for DatanodeKafkaConfig { max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig::default(), - num_topics: 64, - num_partitions: 1, - selector_type: TopicSelectorType::RoundRobin, - replication_factor: 1, - create_topic_timeout: Duration::from_secs(30), - topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), + kafka_topic: KafkaTopicConfig::default(), } } } diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index 99efe762fbc0..6698c3393136 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -16,7 +16,7 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; +use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; /// Kafka wal configurations for metasrv. @@ -25,37 +25,28 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; pub struct MetasrvKafkaConfig { /// The broker endpoints of the Kafka cluster. pub broker_endpoints: Vec, - /// The number of topics to be created upon start. - pub num_topics: usize, - /// The type of the topic selector with which to select a topic for a region. - pub selector_type: TopicSelectorType, - /// Topic name prefix. - pub topic_name_prefix: String, - /// The number of partitions per topic. - pub num_partitions: i32, - /// The replication factor of each topic. - pub replication_factor: i16, - /// The timeout of topic creation. - #[serde(with = "humantime_serde")] - pub create_topic_timeout: Duration, /// The backoff config. #[serde(flatten, with = "backoff_prefix")] pub backoff: BackoffConfig, + pub kafka_topic: KafkaTopicConfig, } impl Default for MetasrvKafkaConfig { fn default() -> Self { let broker_endpoints = vec![BROKER_ENDPOINT.to_string()]; let replication_factor = broker_endpoints.len() as i16; - Self { - broker_endpoints, + let kafka_topic = KafkaTopicConfig { num_topics: 64, selector_type: TopicSelectorType::RoundRobin, topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), num_partitions: 1, replication_factor, create_topic_timeout: Duration::from_secs(30), + }; + Self { + broker_endpoints, backoff: BackoffConfig::default(), + kafka_topic, } } } From f6403a7a4a6174e15131486e31010b60a070b835 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 3 Jul 2024 20:29:56 +0800 Subject: [PATCH 06/18] fix check --- src/common/wal/src/config.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 18b5000576b0..9167906dcc89 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -76,6 +76,7 @@ mod tests { use common_base::readable_size::ReadableSize; use rskafka::client::partition::Compression; + use tests::kafka::common::KafkaTopicConfig; use super::*; use crate::config::kafka::common::BackoffConfig; @@ -149,18 +150,20 @@ mod tests { let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap(); let expected = MetasrvKafkaConfig { broker_endpoints: vec!["127.0.0.1:9092".to_string()], - num_topics: 32, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), - num_partitions: 1, - replication_factor: 1, - create_topic_timeout: Duration::from_secs(30), backoff: BackoffConfig { init: Duration::from_millis(500), max: Duration::from_secs(10), base: 2, deadline: Some(Duration::from_secs(60 * 5)), }, + kafka_topic: KafkaTopicConfig { + num_topics: 32, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + }, }; assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected)); @@ -168,12 +171,6 @@ mod tests { let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); let expected = DatanodeKafkaConfig { broker_endpoints: vec!["127.0.0.1:9092".to_string()], - num_topics: 32, - selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), - num_partitions: 1, - replication_factor: 1, - create_topic_timeout: Duration::from_secs(30), compression: Compression::default(), max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), @@ -183,6 +180,14 @@ mod tests { base: 2, deadline: Some(Duration::from_secs(60 * 5)), }, + kafka_topic: KafkaTopicConfig { + num_topics: 32, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + }, }; assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); } From 286382f56c44fb34023e1f46fb96233a1f3e414d Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 3 Jul 2024 21:23:04 +0800 Subject: [PATCH 07/18] fix check --- src/common/meta/src/wal_options_allocator.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index 09b03c5b7dca..5fb3db6e20eb 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -123,6 +123,7 @@ pub fn prepare_wal_options( #[cfg(test)] mod tests { + use common_wal::config::kafka::common::KafkaTopicConfig; use common_wal::config::kafka::MetasrvKafkaConfig; use common_wal::test_util::run_test_with_kafka_wal; @@ -160,9 +161,13 @@ mod tests { .collect::>(); // Creates a topic manager. - let config = MetasrvKafkaConfig { + let kafka_topic = KafkaTopicConfig { replication_factor: broker_endpoints.len() as i16, + ..Default::default() + }; + let config = MetasrvKafkaConfig { broker_endpoints, + kafka_topic, ..Default::default() }; let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; From d39c0d923510c5c74baa1851bbe1801703bc14b7 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 3 Jul 2024 21:37:34 +0800 Subject: [PATCH 08/18] fix check --- tests-integration/src/tests/test_util.rs | 15 +++++-- tests-integration/tests/region_migration.rs | 50 +++++++++++++++------ 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 7bb29ce3318e..491a93086953 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -21,6 +21,7 @@ use common_query::Output; use common_recordbatch::util; use common_telemetry::warn; use common_test_util::find_workspace_path; +use common_wal::config::kafka::common::KafkaTopicConfig; use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use frontend::instance::Instance; @@ -231,8 +232,11 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option Option Date: Wed, 3 Jul 2024 23:16:08 +0800 Subject: [PATCH 09/18] add flatten with --- config/config.md | 10 +++++----- config/metasrv.example.toml | 12 ++++++------ .../wal_options_allocator/kafka/topic_manager.rs | 2 +- src/common/wal/src/config.rs | 15 ++++++++------- src/common/wal/src/config/kafka/common.rs | 5 +++-- src/common/wal/src/config/kafka/datanode.rs | 6 ++++-- src/common/wal/src/config/kafka/metasrv.rs | 7 +++++-- tests-integration/src/tests/test_util.rs | 4 ++-- tests-integration/tests/region_migration.rs | 14 +++++++------- 9 files changed, 41 insertions(+), 34 deletions(-) diff --git a/config/config.md b/config/config.md index a594e7368074..c06fc92b06d3 100644 --- a/config/config.md +++ b/config/config.md @@ -269,11 +269,11 @@ | `wal` | -- | -- | -- | | `wal.provider` | String | `raft_engine` | -- | | `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. | -| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. | -| `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | -| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. | -| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. | -| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | +| `wal.kafka_topic_num_topics` | Integer | `64` | Number of topics to be created upon start. | +| `wal.kafka_topic_selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | +| `wal.kafka_topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. | +| `wal.kafka_topic_replication_factor` | Integer | `1` | Expected number of replicas of each partition. | +| `wal.kafka_topic_create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | | `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. | | `wal.backoff_max` | String | `10s` | The maximum backoff for kafka clients. | | `wal.backoff_base` | Integer | `2` | Exponential backoff rate, i.e. next backoff = base * current backoff. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 1128d274cef2..cbb02c220b9a 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -77,21 +77,21 @@ provider = "raft_engine" broker_endpoints = ["127.0.0.1:9092"] ## Number of topics to be created upon start. -num_topics = 64 +kafka_topic_num_topics = 64 ## Topic selector type. ## Available selector types: ## - `round_robin` (default) -selector_type = "round_robin" +kafka_topic_selector_type = "round_robin" -## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. -topic_name_prefix = "greptimedb_wal_topic" +## A Kafka topic is constructed by concatenating `name_prefix` and `topic_id`. +kafka_topic_name_prefix = "greptimedb_wal_topic" ## Expected number of replicas of each partition. -replication_factor = 1 +kafka_topic_replication_factor = 1 ## Above which a topic creation operation will be cancelled. -create_topic_timeout = "30s" +kafka_topic_create_topic_timeout = "30s" ## The initial backoff for kafka clients. backoff_init = "500ms" diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index 9747fc8b80d3..cf8a194a7efb 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -57,7 +57,7 @@ impl TopicManager { pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self { // Topics should be created. let topics = (0..config.kafka_topic.num_topics) - .map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.topic_name_prefix)) + .map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.name_prefix)) .collect::>(); let selector = match config.kafka_topic.selector_type { diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 9167906dcc89..026ed0b60d2c 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -132,11 +132,6 @@ mod tests { let toml_str = r#" provider = "kafka" broker_endpoints = ["127.0.0.1:9092"] - num_topics = 32 - selector_type = "round_robin" - topic_name_prefix = "greptimedb_wal_topic" - replication_factor = 1 - create_topic_timeout = "30s" max_batch_bytes = "1MB" linger = "200ms" consumer_wait_timeout = "100ms" @@ -144,6 +139,12 @@ mod tests { backoff_max = "10s" backoff_base = 2 backoff_deadline = "5mins" + kafka_topic_num_topics = 32 + kafka_topic_num_partitions = 1 + kafka_topic_selector_type = "round_robin" + kafka_topic_replication_factor = 1 + kafka_topic_create_topic_timeout = "30s" + kafka_topic_name_prefix = "greptimedb_wal_topic" "#; // Deserialized to MetasrvWalConfig. @@ -159,7 +160,7 @@ mod tests { kafka_topic: KafkaTopicConfig { num_topics: 32, selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), + name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, replication_factor: 1, create_topic_timeout: Duration::from_secs(30), @@ -183,7 +184,7 @@ mod tests { kafka_topic: KafkaTopicConfig { num_topics: 32, selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_topic".to_string(), + name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, replication_factor: 1, create_topic_timeout: Duration::from_secs(30), diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index 3bdf92ffc77a..c5548480b448 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -20,6 +20,7 @@ use serde_with::with_prefix; use crate::{TopicSelectorType, TOPIC_NAME_PREFIX}; with_prefix!(pub backoff_prefix "backoff_"); +with_prefix!(pub kafka_topic_prefix "kafka_topic_"); /// Backoff configurations for kafka clients. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -64,7 +65,7 @@ pub struct KafkaTopicConfig { #[serde(with = "humantime_serde")] pub create_topic_timeout: Duration, /// Topic name prefix. - pub topic_name_prefix: String, + pub name_prefix: String, } impl Default for KafkaTopicConfig { @@ -75,7 +76,7 @@ impl Default for KafkaTopicConfig { selector_type: TopicSelectorType::RoundRobin, replication_factor: 1, create_topic_timeout: Duration::from_secs(30), - topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), + name_prefix: TOPIC_NAME_PREFIX.to_string(), } } } diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index 63d68fd3a5a3..a4b36d2131ba 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -18,7 +18,9 @@ use common_base::readable_size::ReadableSize; use rskafka::client::partition::Compression; use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; +use crate::config::kafka::common::{ + backoff_prefix, kafka_topic_prefix, BackoffConfig, KafkaTopicConfig, +}; use crate::BROKER_ENDPOINT; /// Kafka wal configurations for datanode. @@ -41,7 +43,7 @@ pub struct DatanodeKafkaConfig { #[serde(flatten, with = "backoff_prefix")] pub backoff: BackoffConfig, /// The kafka topic config. - #[serde(flatten)] + #[serde(flatten, with = "kafka_topic_prefix")] pub kafka_topic: KafkaTopicConfig, } diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index 6698c3393136..359435633049 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -16,7 +16,9 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; +use crate::config::kafka::common::{ + backoff_prefix, kafka_topic_prefix, BackoffConfig, KafkaTopicConfig, +}; use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; /// Kafka wal configurations for metasrv. @@ -28,6 +30,7 @@ pub struct MetasrvKafkaConfig { /// The backoff config. #[serde(flatten, with = "backoff_prefix")] pub backoff: BackoffConfig, + #[serde(flatten, with = "kafka_topic_prefix")] pub kafka_topic: KafkaTopicConfig, } @@ -38,7 +41,7 @@ impl Default for MetasrvKafkaConfig { let kafka_topic = KafkaTopicConfig { num_topics: 64, selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), + name_prefix: TOPIC_NAME_PREFIX.to_string(), num_partitions: 1, replication_factor, create_topic_timeout: Duration::from_secs(30), diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 491a93086953..0d2a4055ecf7 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -233,7 +233,7 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option Option Date: Wed, 3 Jul 2024 23:19:06 +0800 Subject: [PATCH 10/18] revert config.md --- config/config.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/config/config.md b/config/config.md index c06fc92b06d3..a594e7368074 100644 --- a/config/config.md +++ b/config/config.md @@ -269,11 +269,11 @@ | `wal` | -- | -- | -- | | `wal.provider` | String | `raft_engine` | -- | | `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. | -| `wal.kafka_topic_num_topics` | Integer | `64` | Number of topics to be created upon start. | -| `wal.kafka_topic_selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | -| `wal.kafka_topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. | -| `wal.kafka_topic_replication_factor` | Integer | `1` | Expected number of replicas of each partition. | -| `wal.kafka_topic_create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | +| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. | +| `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | +| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. | +| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. | +| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | | `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. | | `wal.backoff_max` | String | `10s` | The maximum backoff for kafka clients. | | `wal.backoff_base` | Integer | `2` | Exponential backoff rate, i.e. next backoff = base * current backoff. | From d2fefab46f541255a744b0522a678dbe359d8c99 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 3 Jul 2024 23:58:48 +0800 Subject: [PATCH 11/18] fix test params --- config/config.md | 10 +++++----- tests/conf/metasrv-test.toml.template | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/config/config.md b/config/config.md index a594e7368074..c75f03665b42 100644 --- a/config/config.md +++ b/config/config.md @@ -269,11 +269,11 @@ | `wal` | -- | -- | -- | | `wal.provider` | String | `raft_engine` | -- | | `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. | -| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. | -| `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | -| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. | -| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. | -| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | +| `wal.kafka_topic_num_topics` | Integer | `64` | Number of topics to be created upon start. | +| `wal.kafka_topic_selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | +| `wal.kafka_topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `name_prefix` and `topic_id`. | +| `wal.kafka_topic_replication_factor` | Integer | `1` | Expected number of replicas of each partition. | +| `wal.kafka_topic_create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | | `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. | | `wal.backoff_max` | String | `10s` | The maximum backoff for kafka clients. | | `wal.backoff_base` | Integer | `2` | Exponential backoff rate, i.e. next backoff = base * current backoff. | diff --git a/tests/conf/metasrv-test.toml.template b/tests/conf/metasrv-test.toml.template index ecd69473249a..ba499a642b57 100644 --- a/tests/conf/metasrv-test.toml.template +++ b/tests/conf/metasrv-test.toml.template @@ -4,7 +4,7 @@ provider = "raft_engine" {{ else }} provider = "kafka" broker_endpoints = {kafka_wal_broker_endpoints | unescaped} -num_topics = 64 +kafka_topic_num_topics = 64 selector_type = "round_robin" topic_name_prefix = "distributed_test_greptimedb_wal_topic" {{ endif }} From 3481f13fad36b58f037cb469eab0cbcf481a9264 Mon Sep 17 00:00:00 2001 From: irenjj Date: Thu, 4 Jul 2024 22:36:43 +0800 Subject: [PATCH 12/18] fix test param --- tests/conf/metasrv-test.toml.template | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/conf/metasrv-test.toml.template b/tests/conf/metasrv-test.toml.template index ba499a642b57..c30148ca9c06 100644 --- a/tests/conf/metasrv-test.toml.template +++ b/tests/conf/metasrv-test.toml.template @@ -5,6 +5,6 @@ provider = "raft_engine" provider = "kafka" broker_endpoints = {kafka_wal_broker_endpoints | unescaped} kafka_topic_num_topics = 64 -selector_type = "round_robin" -topic_name_prefix = "distributed_test_greptimedb_wal_topic" +kafka_topic_selector_type = "round_robin" +kafka_topic_name_prefix = "distributed_test_greptimedb_wal_topic" {{ endif }} From def09224b6ba9a9ac20d3af3618887776a034d08 Mon Sep 17 00:00:00 2001 From: irenjj Date: Fri, 5 Jul 2024 22:50:30 +0800 Subject: [PATCH 13/18] fix missing params when provider is kafka --- src/common/wal/src/config/kafka/common.rs | 1 + src/common/wal/src/config/kafka/metasrv.rs | 16 +++------------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index c5548480b448..d99595aadab1 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -52,6 +52,7 @@ impl Default for BackoffConfig { /// Topic configurations for kafka clients. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] pub struct KafkaTopicConfig { /// Number of topics to be created upon start. pub num_topics: usize, diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index 359435633049..1737bf6bc48c 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -12,14 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; - use serde::{Deserialize, Serialize}; use crate::config::kafka::common::{ backoff_prefix, kafka_topic_prefix, BackoffConfig, KafkaTopicConfig, }; -use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; +use crate::BROKER_ENDPOINT; /// Kafka wal configurations for metasrv. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -30,6 +28,7 @@ pub struct MetasrvKafkaConfig { /// The backoff config. #[serde(flatten, with = "backoff_prefix")] pub backoff: BackoffConfig, + /// The kafka config. #[serde(flatten, with = "kafka_topic_prefix")] pub kafka_topic: KafkaTopicConfig, } @@ -37,19 +36,10 @@ pub struct MetasrvKafkaConfig { impl Default for MetasrvKafkaConfig { fn default() -> Self { let broker_endpoints = vec![BROKER_ENDPOINT.to_string()]; - let replication_factor = broker_endpoints.len() as i16; - let kafka_topic = KafkaTopicConfig { - num_topics: 64, - selector_type: TopicSelectorType::RoundRobin, - name_prefix: TOPIC_NAME_PREFIX.to_string(), - num_partitions: 1, - replication_factor, - create_topic_timeout: Duration::from_secs(30), - }; Self { broker_endpoints, backoff: BackoffConfig::default(), - kafka_topic, + kafka_topic: KafkaTopicConfig::default(), } } } From af55429c980cf07e6931294fb3bf0eed64867f5c Mon Sep 17 00:00:00 2001 From: irenjj Date: Sat, 6 Jul 2024 11:10:22 +0800 Subject: [PATCH 14/18] remove unsed files --- tests-integration/fixtures/kafka/docker.json | 1 - 1 file changed, 1 deletion(-) delete mode 100644 tests-integration/fixtures/kafka/docker.json diff --git a/tests-integration/fixtures/kafka/docker.json b/tests-integration/fixtures/kafka/docker.json deleted file mode 100644 index 8b137891791f..000000000000 --- a/tests-integration/fixtures/kafka/docker.json +++ /dev/null @@ -1 +0,0 @@ - From 51fa276951b4d1bb24a0aa25cc8db95f980ced74 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 10 Jul 2024 22:33:00 +0800 Subject: [PATCH 15/18] remove with prefix --- config/config.md | 10 +++++----- config/metasrv.example.toml | 10 +++++----- .../wal_options_allocator/kafka/topic_manager.rs | 2 +- src/common/wal/src/config.rs | 16 ++++++++-------- src/common/wal/src/config/kafka/common.rs | 5 ++--- src/common/wal/src/config/kafka/datanode.rs | 6 ++---- src/common/wal/src/config/kafka/metasrv.rs | 6 ++---- tests/conf/metasrv-test.toml.template | 6 +++--- 8 files changed, 28 insertions(+), 33 deletions(-) diff --git a/config/config.md b/config/config.md index 727f83289624..9652a57aa79b 100644 --- a/config/config.md +++ b/config/config.md @@ -279,11 +279,11 @@ | `wal` | -- | -- | -- | | `wal.provider` | String | `raft_engine` | -- | | `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. | -| `wal.kafka_topic_num_topics` | Integer | `64` | Number of topics to be created upon start. | -| `wal.kafka_topic_selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | -| `wal.kafka_topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `name_prefix` and `topic_id`. | -| `wal.kafka_topic_replication_factor` | Integer | `1` | Expected number of replicas of each partition. | -| `wal.kafka_topic_create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | +| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. | +| `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | +| `wal.topic_anme_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `name_prefix` and `topic_id`. | +| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. | +| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | | `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. | | `wal.backoff_max` | String | `10s` | The maximum backoff for kafka clients. | | `wal.backoff_base` | Integer | `2` | Exponential backoff rate, i.e. next backoff = base * current backoff. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index cbb02c220b9a..d26955ce37de 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -77,21 +77,21 @@ provider = "raft_engine" broker_endpoints = ["127.0.0.1:9092"] ## Number of topics to be created upon start. -kafka_topic_num_topics = 64 +num_topics = 64 ## Topic selector type. ## Available selector types: ## - `round_robin` (default) -kafka_topic_selector_type = "round_robin" +selector_type = "round_robin" ## A Kafka topic is constructed by concatenating `name_prefix` and `topic_id`. -kafka_topic_name_prefix = "greptimedb_wal_topic" +topic_name_prefix = "greptimedb_wal_topic" ## Expected number of replicas of each partition. -kafka_topic_replication_factor = 1 +replication_factor = 1 ## Above which a topic creation operation will be cancelled. -kafka_topic_create_topic_timeout = "30s" +create_topic_timeout = "30s" ## The initial backoff for kafka clients. backoff_init = "500ms" diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index e93f8cf678cc..ec88e37cd14d 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -57,7 +57,7 @@ impl TopicManager { pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self { // Topics should be created. let topics = (0..config.kafka_topic.num_topics) - .map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.name_prefix)) + .map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.topic_name_prefix)) .collect::>(); let selector = match config.kafka_topic.selector_type { diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 4aa8056cb6e8..b5849b039d98 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -139,12 +139,12 @@ mod tests { backoff_max = "10s" backoff_base = 2 backoff_deadline = "5mins" - kafka_topic_num_topics = 32 - kafka_topic_num_partitions = 1 - kafka_topic_selector_type = "round_robin" - kafka_topic_replication_factor = 1 - kafka_topic_create_topic_timeout = "30s" - kafka_topic_name_prefix = "greptimedb_wal_topic" + num_topics = 32 + num_partitions = 1 + selector_type = "round_robin" + replication_factor = 1 + create_topic_timeout = "30s" + topic_name_prefix = "greptimedb_wal_topic" "#; // Deserialized to MetasrvWalConfig. @@ -160,7 +160,7 @@ mod tests { kafka_topic: KafkaTopicConfig { num_topics: 32, selector_type: TopicSelectorType::RoundRobin, - name_prefix: "greptimedb_wal_topic".to_string(), + topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, replication_factor: 1, create_topic_timeout: Duration::from_secs(30), @@ -183,7 +183,7 @@ mod tests { kafka_topic: KafkaTopicConfig { num_topics: 32, selector_type: TopicSelectorType::RoundRobin, - name_prefix: "greptimedb_wal_topic".to_string(), + topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, replication_factor: 1, create_topic_timeout: Duration::from_secs(30), diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index d99595aadab1..e61823938546 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -20,7 +20,6 @@ use serde_with::with_prefix; use crate::{TopicSelectorType, TOPIC_NAME_PREFIX}; with_prefix!(pub backoff_prefix "backoff_"); -with_prefix!(pub kafka_topic_prefix "kafka_topic_"); /// Backoff configurations for kafka clients. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -66,7 +65,7 @@ pub struct KafkaTopicConfig { #[serde(with = "humantime_serde")] pub create_topic_timeout: Duration, /// Topic name prefix. - pub name_prefix: String, + pub topic_name_prefix: String, } impl Default for KafkaTopicConfig { @@ -77,7 +76,7 @@ impl Default for KafkaTopicConfig { selector_type: TopicSelectorType::RoundRobin, replication_factor: 1, create_topic_timeout: Duration::from_secs(30), - name_prefix: TOPIC_NAME_PREFIX.to_string(), + topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), } } } diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index 6d83253d6a34..b01e0635f637 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -17,9 +17,7 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{ - backoff_prefix, kafka_topic_prefix, BackoffConfig, KafkaTopicConfig, -}; +use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; use crate::BROKER_ENDPOINT; /// Kafka wal configurations for datanode. @@ -39,7 +37,7 @@ pub struct DatanodeKafkaConfig { #[serde(flatten, with = "backoff_prefix")] pub backoff: BackoffConfig, /// The kafka topic config. - #[serde(flatten, with = "kafka_topic_prefix")] + #[serde(flatten)] pub kafka_topic: KafkaTopicConfig, } diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index 1737bf6bc48c..519992e17579 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -14,9 +14,7 @@ use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{ - backoff_prefix, kafka_topic_prefix, BackoffConfig, KafkaTopicConfig, -}; +use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; use crate::BROKER_ENDPOINT; /// Kafka wal configurations for metasrv. @@ -29,7 +27,7 @@ pub struct MetasrvKafkaConfig { #[serde(flatten, with = "backoff_prefix")] pub backoff: BackoffConfig, /// The kafka config. - #[serde(flatten, with = "kafka_topic_prefix")] + #[serde(flatten)] pub kafka_topic: KafkaTopicConfig, } diff --git a/tests/conf/metasrv-test.toml.template b/tests/conf/metasrv-test.toml.template index c30148ca9c06..0b27804fe004 100644 --- a/tests/conf/metasrv-test.toml.template +++ b/tests/conf/metasrv-test.toml.template @@ -4,7 +4,7 @@ provider = "raft_engine" {{ else }} provider = "kafka" broker_endpoints = {kafka_wal_broker_endpoints | unescaped} -kafka_topic_num_topics = 64 -kafka_topic_selector_type = "round_robin" -kafka_topic_name_prefix = "distributed_test_greptimedb_wal_topic" +num_topics = 64 +selector_type = "round_robin" +name_prefix = "distributed_test_greptimedb_wal_topic" {{ endif }} From 816176cc633dffe19c7db90251e371469e50ba28 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 10 Jul 2024 22:35:38 +0800 Subject: [PATCH 16/18] fix doc --- config/config.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config.md b/config/config.md index 9652a57aa79b..3a9cee8037dd 100644 --- a/config/config.md +++ b/config/config.md @@ -281,7 +281,7 @@ | `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. | | `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | -| `wal.topic_anme_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `name_prefix` and `topic_id`. | +| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `name_prefix` and `topic_id`. | | `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. | | `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | | `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. | From 9731e5528d60cdbf8f1a1a92251b31d0d2bf00b4 Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 10 Jul 2024 22:55:41 +0800 Subject: [PATCH 17/18] fix test --- config/config.md | 2 +- config/metasrv.example.toml | 2 +- tests-integration/src/tests/test_util.rs | 4 ++-- tests-integration/tests/region_migration.rs | 14 +++++++------- tests/conf/metasrv-test.toml.template | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/config/config.md b/config/config.md index 3a9cee8037dd..88380597872b 100644 --- a/config/config.md +++ b/config/config.md @@ -281,7 +281,7 @@ | `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. | | `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | -| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `name_prefix` and `topic_id`. | +| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. | | `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. | | `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | | `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index d26955ce37de..1128d274cef2 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -84,7 +84,7 @@ num_topics = 64 ## - `round_robin` (default) selector_type = "round_robin" -## A Kafka topic is constructed by concatenating `name_prefix` and `topic_id`. +## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. topic_name_prefix = "greptimedb_wal_topic" ## Expected number of replicas of each partition. diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 0d2a4055ecf7..491a93086953 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -233,7 +233,7 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option Option Date: Wed, 10 Jul 2024 23:00:26 +0800 Subject: [PATCH 18/18] fix clippy --- src/common/wal/src/config.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index b5849b039d98..6edee1703c81 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -75,7 +75,6 @@ mod tests { use std::time::Duration; use common_base::readable_size::ReadableSize; - use rskafka::client::partition::Compression; use tests::kafka::common::KafkaTopicConfig; use super::*;