Skip to content

Commit

Permalink
fix(kafka sink): Use rdkafka::client::Client instead of Consumer (#21129
Browse files Browse the repository at this point in the history
)

* fix(kafka sink): Use rdkafka::client::Client instead of Consumer for Kafka sink healthcheck

Discussed in #20510

* style(kafka sink): Use string interpolation in error string formatting

* docs: add changelog fragment
  • Loading branch information
belltoy committed Aug 22, 2024
1 parent 82fd2a5 commit 66b55fe
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 87 deletions.
3 changes: 3 additions & 0 deletions changelog.d/21129_suppress_warnings_for_kafka_sink.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `kafka` sink no longer emits warnings due to applying rdkafka options to a consumer used for the health check. Now it uses the producer client for the health check.

authors: belltoy
136 changes: 63 additions & 73 deletions src/sinks/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,8 @@ fn example_librdkafka_options() -> HashMap<String, String> {
])
}

/// Used to determine the options to set in configs, since both Kafka consumers and producers have
/// unique options, they use the same struct, and the error if given the wrong options.
#[derive(Debug, PartialOrd, PartialEq, Eq)]
pub enum KafkaRole {
Consumer,
Producer,
}

impl KafkaSinkConfig {
pub(crate) fn to_rdkafka(&self, kafka_role: KafkaRole) -> crate::Result<ClientConfig> {
pub(crate) fn to_rdkafka(&self) -> crate::Result<ClientConfig> {
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &self.bootstrap_servers)
Expand All @@ -164,73 +156,71 @@ impl KafkaSinkConfig {
self.auth.apply(&mut client_config)?;

// All batch options are producer only.
if kafka_role == KafkaRole::Producer {
client_config
.set("compression.codec", &to_string(self.compression))
.set(
"message.timeout.ms",
&self.message_timeout_ms.as_millis().to_string(),
);

if let Some(value) = self.batch.timeout_secs {
// Delay in milliseconds to wait for messages in the producer queue to accumulate before
// constructing message batches (MessageSets) to transmit to brokers. A higher value
// allows larger and more effective (less overhead, improved compression) batches of
// messages to accumulate at the expense of increased message delivery latency.
// Type: float
let key = "queue.buffering.max.ms";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{}={}`.\
The config already sets this as `librdkafka_options.queue.buffering.max.ms={}`.\
Please delete one.", key, value, val).into());
}
debug!(
librdkafka_option = key,
batch_option = "timeout_secs",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &((value * 1000.0).round().to_string()));
client_config
.set("compression.codec", &to_string(self.compression))
.set(
"message.timeout.ms",
&self.message_timeout_ms.as_millis().to_string(),
);

if let Some(value) = self.batch.timeout_secs {
// Delay in milliseconds to wait for messages in the producer queue to accumulate before
// constructing message batches (MessageSets) to transmit to brokers. A higher value
// allows larger and more effective (less overhead, improved compression) batches of
// messages to accumulate at the expense of increased message delivery latency.
// Type: float
let key = "queue.buffering.max.ms";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{key}={value}`.\
The config already sets this as `librdkafka_options.queue.buffering.max.ms={val}`.\
Please delete one.").into());
}
if let Some(value) = self.batch.max_events {
// Maximum number of messages batched in one MessageSet. The total MessageSet size is
// also limited by batch.size and message.max.bytes.
// Type: integer
let key = "batch.num.messages";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{}={}`.\
The config already sets this as `librdkafka_options.batch.num.messages={}`.\
Please delete one.", key, value, val).into());
}
debug!(
librdkafka_option = key,
batch_option = "max_events",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
debug!(
librdkafka_option = key,
batch_option = "timeout_secs",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &((value * 1000.0).round().to_string()));
}
if let Some(value) = self.batch.max_events {
// Maximum number of messages batched in one MessageSet. The total MessageSet size is
// also limited by batch.size and message.max.bytes.
// Type: integer
let key = "batch.num.messages";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{key}={value}`.\
The config already sets this as `librdkafka_options.batch.num.messages={val}`.\
Please delete one.").into());
}
if let Some(value) = self.batch.max_bytes {
// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
// framing overhead. This limit is applied after the first message has been added to the
// batch, regardless of the first message's size, this is to ensure that messages that
// exceed batch.size are produced. The total MessageSet size is also limited by
// batch.num.messages and message.max.bytes.
// Type: integer
let key = "batch.size";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{}={}`.\
The config already sets this as `librdkafka_options.batch.size={}`.\
Please delete one.", key, value, val).into());
}
debug!(
librdkafka_option = key,
batch_option = "max_bytes",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
debug!(
librdkafka_option = key,
batch_option = "max_events",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
}
if let Some(value) = self.batch.max_bytes {
// Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
// framing overhead. This limit is applied after the first message has been added to the
// batch, regardless of the first message's size, this is to ensure that messages that
// exceed batch.size are produced. The total MessageSet size is also limited by
// batch.num.messages and message.max.bytes.
// Type: integer
let key = "batch.size";
if let Some(val) = self.librdkafka_options.get(key) {
return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{key}={value}`.\
The config already sets this as `librdkafka_options.batch.size={val}`.\
Please delete one.").into());
}
debug!(
librdkafka_option = key,
batch_option = "max_bytes",
value,
"Applying batch option as librdkafka option."
);
client_config.set(key, &value.to_string());
}

for (key, value) in self.librdkafka_options.iter() {
Expand Down
14 changes: 7 additions & 7 deletions src/sinks/kafka/sink.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use rdkafka::{
consumer::{BaseConsumer, Consumer},
error::KafkaError,
producer::FutureProducer,
producer::{BaseProducer, FutureProducer, Producer},
ClientConfig,
};
use snafu::{ResultExt, Snafu};
use tokio::time::Duration;
use tracing::Span;
use vrl::path::OwnedTargetPath;

use super::config::{KafkaRole, KafkaSinkConfig};
use super::config::KafkaSinkConfig;
use crate::{
kafka::KafkaStatisticsContext,
sinks::kafka::{request_builder::KafkaRequestBuilder, service::KafkaService},
Expand Down Expand Up @@ -46,7 +45,7 @@ pub(crate) fn create_producer(

impl KafkaSink {
pub(crate) fn new(config: KafkaSinkConfig) -> crate::Result<Self> {
let producer_config = config.to_rdkafka(KafkaRole::Producer)?;
let producer_config = config.to_rdkafka()?;
let producer = create_producer(producer_config)?;
let transformer = config.encoding.transformer();
let serializer = config.encoding.build()?;
Expand Down Expand Up @@ -105,7 +104,7 @@ impl KafkaSink {

pub(crate) async fn healthcheck(config: KafkaSinkConfig) -> crate::Result<()> {
trace!("Healthcheck started.");
let client = config.to_rdkafka(KafkaRole::Consumer).unwrap();
let client_config = config.to_rdkafka().unwrap();
let topic: Option<String> = match config.healthcheck_topic {
Some(topic) => Some(topic),
_ => match config.topic.render_string(&LogEvent::from_str_legacy("")) {
Expand All @@ -121,10 +120,11 @@ pub(crate) async fn healthcheck(config: KafkaSinkConfig) -> crate::Result<()> {
};

tokio::task::spawn_blocking(move || {
let consumer: BaseConsumer = client.create().unwrap();
let producer: BaseProducer = client_config.create().unwrap();
let topic = topic.as_ref().map(|topic| &topic[..]);

consumer
producer
.client()
.fetch_metadata(topic, Duration::from_secs(3))
.map(|_| ())
})
Expand Down
9 changes: 2 additions & 7 deletions src/sinks/kafka/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ mod integration_test {
event::{BatchNotifier, BatchStatus},
};

use super::super::{
config::{KafkaRole, KafkaSinkConfig},
sink::KafkaSink,
*,
};
use super::super::{config::KafkaSinkConfig, sink::KafkaSink, *};
use crate::{
event::{ObjectMap, Value},
kafka::{KafkaAuthConfig, KafkaCompression, KafkaSaslConfig},
Expand Down Expand Up @@ -190,8 +186,7 @@ mod integration_test {
headers_key: None,
acknowledgements: Default::default(),
};
config.clone().to_rdkafka(KafkaRole::Consumer)?;
config.clone().to_rdkafka(KafkaRole::Producer)?;
config.clone().to_rdkafka()?;
self::sink::healthcheck(config.clone()).await?;
KafkaSink::new(config)
}
Expand Down

0 comments on commit 66b55fe

Please sign in to comment.