Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove the StandaloneKafkaConfig struct #4253

Merged
merged 21 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@
| `wal` | -- | -- | -- |
| `wal.provider` | String | `raft_engine` | -- |
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. |
| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default) |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
| `wal.kafka_topic_num_topics` | Integer | `64` | Number of topics to be created upon start. |
| `wal.kafka_topic_selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default) |
| `wal.kafka_topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `name_prefix` and `topic_id`. |
| `wal.kafka_topic_replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.kafka_topic_create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
| `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. |
| `wal.backoff_max` | String | `10s` | The maximum backoff for kafka clients. |
| `wal.backoff_base` | Integer | `2` | Exponential backoff rate, i.e. next backoff = base * current backoff. |
Expand Down
12 changes: 6 additions & 6 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,21 @@ provider = "raft_engine"
broker_endpoints = ["127.0.0.1:9092"]

## Number of topics to be created upon start.
num_topics = 64
kafka_topic_num_topics = 64

## Topic selector type.
## Available selector types:
## - `round_robin` (default)
selector_type = "round_robin"
kafka_topic_selector_type = "round_robin"

## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
topic_name_prefix = "greptimedb_wal_topic"
## A Kafka topic is constructed by concatenating `name_prefix` and `topic_id`.
kafka_topic_name_prefix = "greptimedb_wal_topic"

## Expected number of replicas of each partition.
replication_factor = 1
kafka_topic_replication_factor = 1

## Above which a topic creation operation will be cancelled.
create_topic_timeout = "30s"
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
kafka_topic_create_topic_timeout = "30s"
## The initial backoff for kafka clients.
backoff_init = "500ms"

Expand Down
8 changes: 4 additions & 4 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use common_telemetry::info;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use common_wal::config::StandaloneWalConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
Expand Down Expand Up @@ -129,7 +129,7 @@ pub struct StandaloneOptions {
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub prom_store: PromStoreOptions,
pub wal: StandaloneWalConfig,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
Expand All @@ -154,7 +154,7 @@ impl Default for StandaloneOptions {
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
prom_store: PromStoreOptions::default(),
wal: StandaloneWalConfig::default(),
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
Expand Down Expand Up @@ -203,7 +203,7 @@ impl StandaloneOptions {
DatanodeOptions {
node_id: Some(0),
enable_telemetry: cloned_opts.enable_telemetry,
wal: cloned_opts.wal.into(),
wal: cloned_opts.wal,
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
storage: cloned_opts.storage,
region_engine: cloned_opts.region_engine,
grpc: cloned_opts.grpc,
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/tests/load_config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_grpc::channel_manager::{
use common_runtime::global::RuntimeOptions;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::{DatanodeWalConfig, StandaloneWalConfig};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use frontend::frontend::FrontendOptions;
use frontend::service_config::datanode::DatanodeClientOptions;
Expand Down Expand Up @@ -202,7 +202,7 @@ fn test_load_standalone_example_config() {
},
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
wal: StandaloneWalConfig::RaftEngine(RaftEngineConfig {
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
..Default::default()
Expand Down
7 changes: 6 additions & 1 deletion src/common/meta/src/wal_options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub fn prepare_wal_options(

#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;

Expand Down Expand Up @@ -160,9 +161,13 @@ mod tests {
.collect::<Vec<_>>();

// Creates a topic manager.
let config = MetasrvKafkaConfig {
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
broker_endpoints,
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ impl TopicManager {
/// Creates a new topic manager.
pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self {
// Topics should be created.
let topics = (0..config.num_topics)
.map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
let topics = (0..config.kafka_topic.num_topics)
.map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.name_prefix))
.collect::<Vec<_>>();

let selector = match config.selector_type {
let selector = match config.kafka_topic.selector_type {
TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(),
};

Expand All @@ -76,7 +76,7 @@ impl TopicManager {
/// The initializer first tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics.
pub async fn start(&self) -> Result<()> {
let num_topics = self.config.num_topics;
let num_topics = self.config.kafka_topic.num_topics;
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });

// Topics should be created.
Expand Down Expand Up @@ -185,9 +185,9 @@ impl TopicManager {
match client
.create_topic(
topic.clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
self.config.kafka_topic.num_partitions,
self.config.kafka_topic.replication_factor,
self.config.kafka_topic.create_topic_timeout.as_millis() as i32,
)
.await
{
Expand Down Expand Up @@ -242,6 +242,7 @@ impl TopicManager {

#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::test_util::run_test_with_kafka_wal;

use super::*;
Expand Down Expand Up @@ -283,9 +284,13 @@ mod tests {
.collect::<Vec<_>>();

// Creates a topic manager.
let config = MetasrvKafkaConfig {
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
broker_endpoints,
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
Expand Down
115 changes: 33 additions & 82 deletions src/common/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub mod raft_engine;

use serde::{Deserialize, Serialize};

use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::config::raft_engine::RaftEngineConfig;

/// Wal configurations for metasrv.
Expand All @@ -43,82 +43,44 @@ impl Default for DatanodeWalConfig {
}
}

/// Wal configurations for standalone.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum StandaloneWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(StandaloneKafkaConfig),
}

impl Default for StandaloneWalConfig {
fn default() -> Self {
Self::RaftEngine(RaftEngineConfig::default())
}
}

impl From<StandaloneWalConfig> for MetasrvWalConfig {
fn from(config: StandaloneWalConfig) -> Self {
impl From<DatanodeWalConfig> for MetasrvWalConfig {
fn from(config: DatanodeWalConfig) -> Self {
match config {
StandaloneWalConfig::RaftEngine(_) => Self::RaftEngine,
StandaloneWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
broker_endpoints: config.broker_endpoints,
num_topics: config.num_topics,
selector_type: config.selector_type,
topic_name_prefix: config.topic_name_prefix,
num_partitions: config.num_partitions,
replication_factor: config.replication_factor,
create_topic_timeout: config.create_topic_timeout,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
}),
}
}
}

impl From<MetasrvWalConfig> for StandaloneWalConfig {
impl From<MetasrvWalConfig> for DatanodeWalConfig {
fn from(config: MetasrvWalConfig) -> Self {
match config {
MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
MetasrvWalConfig::Kafka(config) => Self::Kafka(StandaloneKafkaConfig {
MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
broker_endpoints: config.broker_endpoints,
num_topics: config.num_topics,
selector_type: config.selector_type,
topic_name_prefix: config.topic_name_prefix,
num_partitions: config.num_partitions,
replication_factor: config.replication_factor,
create_topic_timeout: config.create_topic_timeout,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
..Default::default()
}),
}
}
}

impl From<StandaloneWalConfig> for DatanodeWalConfig {
fn from(config: StandaloneWalConfig) -> Self {
match config {
StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
broker_endpoints: config.broker_endpoints,
compression: config.compression,
max_batch_bytes: config.max_batch_bytes,
consumer_wait_timeout: config.consumer_wait_timeout,
backoff: config.backoff,
}),
}
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression;
use tests::kafka::common::KafkaTopicConfig;

use super::*;
use crate::config::kafka::common::BackoffConfig;
use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig};
use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::TopicSelectorType;

#[test]
Expand Down Expand Up @@ -170,36 +132,39 @@ mod tests {
let toml_str = r#"
provider = "kafka"
broker_endpoints = ["127.0.0.1:9092"]
num_topics = 32
selector_type = "round_robin"
topic_name_prefix = "greptimedb_wal_topic"
replication_factor = 1
create_topic_timeout = "30s"
max_batch_bytes = "1MB"
linger = "200ms"
consumer_wait_timeout = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
kafka_topic_num_topics = 32
kafka_topic_num_partitions = 1
kafka_topic_selector_type = "round_robin"
kafka_topic_replication_factor = 1
kafka_topic_create_topic_timeout = "30s"
kafka_topic_name_prefix = "greptimedb_wal_topic"
"#;

// Deserialized to MetasrvWalConfig.
let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
let expected = MetasrvKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
kafka_topic: KafkaTopicConfig {
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
};
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));

Expand All @@ -216,29 +181,15 @@ mod tests {
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));

// Deserialized to StandaloneWalConfig.
let standalone_wal_config: StandaloneWalConfig = toml::from_str(toml_str).unwrap();
let expected = StandaloneKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
compression: Compression::default(),
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
kafka_topic: KafkaTopicConfig {
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
};
assert_eq!(standalone_wal_config, StandaloneWalConfig::Kafka(expected));
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
}
}
2 changes: 0 additions & 2 deletions src/common/wal/src/config/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
pub mod common;
pub mod datanode;
pub mod metasrv;
pub mod standalone;

pub use datanode::DatanodeKafkaConfig;
pub use metasrv::MetasrvKafkaConfig;
pub use standalone::StandaloneKafkaConfig;
Loading