Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove the StandaloneKafkaConfig struct #4253

Merged
merged 21 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use common_telemetry::info;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use common_wal::config::StandaloneWalConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
Expand Down Expand Up @@ -130,7 +130,7 @@ pub struct StandaloneOptions {
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub prom_store: PromStoreOptions,
pub wal: StandaloneWalConfig,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
Expand All @@ -155,7 +155,7 @@ impl Default for StandaloneOptions {
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
prom_store: PromStoreOptions::default(),
wal: StandaloneWalConfig::default(),
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
Expand Down Expand Up @@ -204,7 +204,7 @@ impl StandaloneOptions {
DatanodeOptions {
node_id: Some(0),
enable_telemetry: cloned_opts.enable_telemetry,
wal: cloned_opts.wal.into(),
wal: cloned_opts.wal,
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
storage: cloned_opts.storage,
region_engine: cloned_opts.region_engine,
grpc: cloned_opts.grpc,
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/tests/load_config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_grpc::channel_manager::{
use common_runtime::global::RuntimeOptions;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::{DatanodeWalConfig, StandaloneWalConfig};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use frontend::frontend::FrontendOptions;
use frontend::service_config::datanode::DatanodeClientOptions;
Expand Down Expand Up @@ -202,7 +202,7 @@ fn test_load_standalone_example_config() {
},
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
wal: StandaloneWalConfig::RaftEngine(RaftEngineConfig {
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
..Default::default()
Expand Down
7 changes: 6 additions & 1 deletion src/common/meta/src/wal_options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub fn prepare_wal_options(

#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;

Expand Down Expand Up @@ -160,9 +161,13 @@ mod tests {
.collect::<Vec<_>>();

// Creates a topic manager.
let config = MetasrvKafkaConfig {
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
broker_endpoints,
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ impl TopicManager {
/// Creates a new topic manager.
pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self {
// Topics should be created.
let topics = (0..config.num_topics)
.map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
let topics = (0..config.kafka_topic.num_topics)
.map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.topic_name_prefix))
.collect::<Vec<_>>();

let selector = match config.selector_type {
let selector = match config.kafka_topic.selector_type {
TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(),
};

Expand All @@ -76,7 +76,7 @@ impl TopicManager {
/// The initializer first tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics.
pub async fn start(&self) -> Result<()> {
let num_topics = self.config.num_topics;
let num_topics = self.config.kafka_topic.num_topics;
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });

// Topics should be created.
Expand Down Expand Up @@ -185,9 +185,9 @@ impl TopicManager {
match client
.create_topic(
topic.clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
self.config.kafka_topic.num_partitions,
self.config.kafka_topic.replication_factor,
self.config.kafka_topic.create_topic_timeout.as_millis() as i32,
)
.await
{
Expand Down Expand Up @@ -242,6 +242,7 @@ impl TopicManager {

#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::test_util::run_test_with_kafka_wal;

use super::*;
Expand Down Expand Up @@ -283,9 +284,13 @@ mod tests {
.collect::<Vec<_>>();

// Creates a topic manager.
let config = MetasrvKafkaConfig {
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
broker_endpoints,
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
Expand Down
113 changes: 33 additions & 80 deletions src/common/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub mod raft_engine;

use serde::{Deserialize, Serialize};

use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::config::raft_engine::RaftEngineConfig;

/// Wal configurations for metasrv.
Expand All @@ -43,80 +43,43 @@ impl Default for DatanodeWalConfig {
}
}

/// Wal configurations for standalone.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum StandaloneWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(StandaloneKafkaConfig),
}

impl Default for StandaloneWalConfig {
fn default() -> Self {
Self::RaftEngine(RaftEngineConfig::default())
}
}

impl From<StandaloneWalConfig> for MetasrvWalConfig {
fn from(config: StandaloneWalConfig) -> Self {
impl From<DatanodeWalConfig> for MetasrvWalConfig {
fn from(config: DatanodeWalConfig) -> Self {
match config {
StandaloneWalConfig::RaftEngine(_) => Self::RaftEngine,
StandaloneWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
broker_endpoints: config.broker_endpoints,
num_topics: config.num_topics,
selector_type: config.selector_type,
topic_name_prefix: config.topic_name_prefix,
num_partitions: config.num_partitions,
replication_factor: config.replication_factor,
create_topic_timeout: config.create_topic_timeout,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
}),
}
}
}

impl From<MetasrvWalConfig> for StandaloneWalConfig {
impl From<MetasrvWalConfig> for DatanodeWalConfig {
fn from(config: MetasrvWalConfig) -> Self {
match config {
MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
MetasrvWalConfig::Kafka(config) => Self::Kafka(StandaloneKafkaConfig {
MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
broker_endpoints: config.broker_endpoints,
num_topics: config.num_topics,
selector_type: config.selector_type,
topic_name_prefix: config.topic_name_prefix,
num_partitions: config.num_partitions,
replication_factor: config.replication_factor,
create_topic_timeout: config.create_topic_timeout,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
..Default::default()
}),
}
}
}

impl From<StandaloneWalConfig> for DatanodeWalConfig {
fn from(config: StandaloneWalConfig) -> Self {
match config {
StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
broker_endpoints: config.broker_endpoints,
max_batch_bytes: config.max_batch_bytes,
consumer_wait_timeout: config.consumer_wait_timeout,
backoff: config.backoff,
}),
}
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use tests::kafka::common::KafkaTopicConfig;

use super::*;
use crate::config::kafka::common::BackoffConfig;
use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig};
use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::TopicSelectorType;

#[test]
Expand Down Expand Up @@ -168,36 +131,39 @@ mod tests {
let toml_str = r#"
provider = "kafka"
broker_endpoints = ["127.0.0.1:9092"]
num_topics = 32
selector_type = "round_robin"
topic_name_prefix = "greptimedb_wal_topic"
replication_factor = 1
create_topic_timeout = "30s"
max_batch_bytes = "1MB"
linger = "200ms"
consumer_wait_timeout = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
num_topics = 32
num_partitions = 1
selector_type = "round_robin"
replication_factor = 1
create_topic_timeout = "30s"
topic_name_prefix = "greptimedb_wal_topic"
"#;

// Deserialized to MetasrvWalConfig.
let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
let expected = MetasrvKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
kafka_topic: KafkaTopicConfig {
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
};
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));

Expand All @@ -213,28 +179,15 @@ mod tests {
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));

// Deserialized to StandaloneWalConfig.
let standalone_wal_config: StandaloneWalConfig = toml::from_str(toml_str).unwrap();
let expected = StandaloneKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
kafka_topic: KafkaTopicConfig {
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
};
assert_eq!(standalone_wal_config, StandaloneWalConfig::Kafka(expected));
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
}
}
2 changes: 0 additions & 2 deletions src/common/wal/src/config/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
pub mod common;
pub mod datanode;
pub mod metasrv;
pub mod standalone;

pub use datanode::DatanodeKafkaConfig;
pub use metasrv::MetasrvKafkaConfig;
pub use standalone::StandaloneKafkaConfig;
34 changes: 34 additions & 0 deletions src/common/wal/src/config/kafka/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;

use crate::{TopicSelectorType, TOPIC_NAME_PREFIX};

with_prefix!(pub backoff_prefix "backoff_");

/// Backoff configurations for kafka clients.
Expand Down Expand Up @@ -46,3 +48,35 @@ impl Default for BackoffConfig {
}
}
}

/// Topic configurations for kafka clients.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KafkaTopicConfig {
/// Number of topics to be created upon start.
pub num_topics: usize,
/// Number of partitions per topic.
pub num_partitions: i32,
/// The type of the topic selector with which to select a topic for a region.
pub selector_type: TopicSelectorType,
/// The replication factor of each topic.
pub replication_factor: i16,
/// The timeout of topic creation.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
/// Topic name prefix.
pub topic_name_prefix: String,
}

impl Default for KafkaTopicConfig {
fn default() -> Self {
Self {
num_topics: 64,
num_partitions: 1,
selector_type: TopicSelectorType::RoundRobin,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
}
}
}
Loading