Skip to content

Commit

Permalink
revert: lz4 compression (#4329)
Browse files Browse the repository at this point in the history
* Revert "test: revert lz4 compression"

This reverts commit 180dda1.

* refactor: remove compression field
  • Loading branch information
WenyXu authored Jul 10, 2024
1 parent 33ed745 commit 52a9a74
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 15 deletions.
4 changes: 0 additions & 4 deletions src/common/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ impl From<StandaloneWalConfig> for DatanodeWalConfig {
StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
broker_endpoints: config.broker_endpoints,
compression: config.compression,
max_batch_bytes: config.max_batch_bytes,
consumer_wait_timeout: config.consumer_wait_timeout,
backoff: config.backoff,
Expand All @@ -114,7 +113,6 @@ mod tests {
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression;

use super::*;
use crate::config::kafka::common::BackoffConfig;
Expand Down Expand Up @@ -207,7 +205,6 @@ mod tests {
let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
let expected = DatanodeKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: Compression::NoCompression,
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
Expand All @@ -229,7 +226,6 @@ mod tests {
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
compression: Compression::NoCompression,
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
Expand Down
5 changes: 0 additions & 5 deletions src/common/wal/src/config/kafka/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression;
use serde::{Deserialize, Serialize};

use crate::config::kafka::common::{backoff_prefix, BackoffConfig};
Expand All @@ -27,9 +26,6 @@ use crate::BROKER_ENDPOINT;
pub struct DatanodeKafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
/// The compression algorithm used to compress kafka records.
#[serde(skip)]
pub compression: Compression,
/// TODO(weny): Remove the alias once we release v0.9.
/// The max size of a single producer batch.
#[serde(alias = "max_batch_size")]
Expand All @@ -46,7 +42,6 @@ impl Default for DatanodeKafkaConfig {
fn default() -> Self {
Self {
broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
compression: Compression::NoCompression,
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
Expand Down
5 changes: 0 additions & 5 deletions src/common/wal/src/config/kafka/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression;
use serde::{Deserialize, Serialize};

use crate::config::kafka::common::{backoff_prefix, BackoffConfig};
Expand All @@ -40,9 +39,6 @@ pub struct StandaloneKafkaConfig {
/// The timeout of topic creation.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
/// The compression algorithm used to compress kafka records.
#[serde(skip)]
pub compression: Compression,
/// TODO(weny): Remove the alias once we release v0.9.
/// The max size of a single producer batch.
#[serde(alias = "max_batch_size")]
Expand All @@ -67,7 +63,6 @@ impl Default for StandaloneKafkaConfig {
num_partitions: 1,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
compression: Compression::NoCompression,
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
Expand Down
2 changes: 1 addition & 1 deletion src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl ClientManager {
producer_channel_size: REQUEST_BATCH_SIZE * 2,
producer_request_batch_size: REQUEST_BATCH_SIZE,
flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
compression: config.compression,
compression: Compression::Lz4,
})
}

Expand Down

0 comments on commit 52a9a74

Please sign in to comment.