diff --git a/changelog.d/21129_suppress_warnings_for_kafka_sink.fix.md b/changelog.d/21129_suppress_warnings_for_kafka_sink.fix.md new file mode 100644 index 0000000000000..843296f7f0540 --- /dev/null +++ b/changelog.d/21129_suppress_warnings_for_kafka_sink.fix.md @@ -0,0 +1,3 @@ +The `kafka` sink no longer emits warnings due to applying rdkafka options to a consumer used for the health check. Now it uses the producer client for the health check. + +authors: belltoy diff --git a/src/sinks/kafka/config.rs b/src/sinks/kafka/config.rs index 5860fa1d291b0..7e42e7ab85d21 100644 --- a/src/sinks/kafka/config.rs +++ b/src/sinks/kafka/config.rs @@ -142,16 +142,8 @@ fn example_librdkafka_options() -> HashMap { ]) } -/// Used to determine the options to set in configs, since both Kafka consumers and producers have -/// unique options, they use the same struct, and the error if given the wrong options. -#[derive(Debug, PartialOrd, PartialEq, Eq)] -pub enum KafkaRole { - Consumer, - Producer, -} - impl KafkaSinkConfig { - pub(crate) fn to_rdkafka(&self, kafka_role: KafkaRole) -> crate::Result { + pub(crate) fn to_rdkafka(&self) -> crate::Result { let mut client_config = ClientConfig::new(); client_config .set("bootstrap.servers", &self.bootstrap_servers) @@ -164,73 +156,71 @@ impl KafkaSinkConfig { self.auth.apply(&mut client_config)?; // All batch options are producer only. - if kafka_role == KafkaRole::Producer { - client_config - .set("compression.codec", &to_string(self.compression)) - .set( - "message.timeout.ms", - &self.message_timeout_ms.as_millis().to_string(), - ); - - if let Some(value) = self.batch.timeout_secs { - // Delay in milliseconds to wait for messages in the producer queue to accumulate before - // constructing message batches (MessageSets) to transmit to brokers. A higher value - // allows larger and more effective (less overhead, improved compression) batches of - // messages to accumulate at the expense of increased message delivery latency. - // Type: float - let key = "queue.buffering.max.ms"; - if let Some(val) = self.librdkafka_options.get(key) { - return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{}={}`.\ - The config already sets this as `librdkafka_options.queue.buffering.max.ms={}`.\ - Please delete one.", key, value, val).into()); - } - debug!( - librdkafka_option = key, - batch_option = "timeout_secs", - value, - "Applying batch option as librdkafka option." - ); - client_config.set(key, &((value * 1000.0).round().to_string())); + client_config + .set("compression.codec", &to_string(self.compression)) + .set( + "message.timeout.ms", + &self.message_timeout_ms.as_millis().to_string(), + ); + + if let Some(value) = self.batch.timeout_secs { + // Delay in milliseconds to wait for messages in the producer queue to accumulate before + // constructing message batches (MessageSets) to transmit to brokers. A higher value + // allows larger and more effective (less overhead, improved compression) batches of + // messages to accumulate at the expense of increased message delivery latency. + // Type: float + let key = "queue.buffering.max.ms"; + if let Some(val) = self.librdkafka_options.get(key) { + return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{key}={value}`.\ + The config already sets this as `librdkafka_options.queue.buffering.max.ms={val}`.\ + Please delete one.").into()); } - if let Some(value) = self.batch.max_events { - // Maximum number of messages batched in one MessageSet. The total MessageSet size is - // also limited by batch.size and message.max.bytes. - // Type: integer - let key = "batch.num.messages"; - if let Some(val) = self.librdkafka_options.get(key) { - return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{}={}`.\ - The config already sets this as `librdkafka_options.batch.num.messages={}`.\ - Please delete one.", key, value, val).into()); - } - debug!( - librdkafka_option = key, - batch_option = "max_events", - value, - "Applying batch option as librdkafka option." - ); - client_config.set(key, &value.to_string()); + debug!( + librdkafka_option = key, + batch_option = "timeout_secs", + value, + "Applying batch option as librdkafka option." + ); + client_config.set(key, &((value * 1000.0).round().to_string())); + } + if let Some(value) = self.batch.max_events { + // Maximum number of messages batched in one MessageSet. The total MessageSet size is + // also limited by batch.size and message.max.bytes. + // Type: integer + let key = "batch.num.messages"; + if let Some(val) = self.librdkafka_options.get(key) { + return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{key}={value}`.\ + The config already sets this as `librdkafka_options.batch.num.messages={val}`.\ + Please delete one.").into()); } - if let Some(value) = self.batch.max_bytes { - // Maximum size (in bytes) of all messages batched in one MessageSet, including protocol - // framing overhead. This limit is applied after the first message has been added to the - // batch, regardless of the first message's size, this is to ensure that messages that - // exceed batch.size are produced. The total MessageSet size is also limited by - // batch.num.messages and message.max.bytes. - // Type: integer - let key = "batch.size"; - if let Some(val) = self.librdkafka_options.get(key) { - return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{}={}`.\ - The config already sets this as `librdkafka_options.batch.size={}`.\ - Please delete one.", key, value, val).into()); - } - debug!( - librdkafka_option = key, - batch_option = "max_bytes", - value, - "Applying batch option as librdkafka option." - ); - client_config.set(key, &value.to_string()); + debug!( + librdkafka_option = key, + batch_option = "max_events", + value, + "Applying batch option as librdkafka option." + ); + client_config.set(key, &value.to_string()); + } + if let Some(value) = self.batch.max_bytes { + // Maximum size (in bytes) of all messages batched in one MessageSet, including protocol + // framing overhead. This limit is applied after the first message has been added to the + // batch, regardless of the first message's size, this is to ensure that messages that + // exceed batch.size are produced. The total MessageSet size is also limited by + // batch.num.messages and message.max.bytes. + // Type: integer + let key = "batch.size"; + if let Some(val) = self.librdkafka_options.get(key) { + return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{key}={value}`.\ + The config already sets this as `librdkafka_options.batch.size={val}`.\ + Please delete one.").into()); } + debug!( + librdkafka_option = key, + batch_option = "max_bytes", + value, + "Applying batch option as librdkafka option." + ); + client_config.set(key, &value.to_string()); } for (key, value) in self.librdkafka_options.iter() { diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index c908979e5c2e2..03b11abae068e 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -1,7 +1,6 @@ use rdkafka::{ - consumer::{BaseConsumer, Consumer}, error::KafkaError, - producer::FutureProducer, + producer::{BaseProducer, FutureProducer, Producer}, ClientConfig, }; use snafu::{ResultExt, Snafu}; @@ -9,7 +8,7 @@ use tokio::time::Duration; use tracing::Span; use vrl::path::OwnedTargetPath; -use super::config::{KafkaRole, KafkaSinkConfig}; +use super::config::KafkaSinkConfig; use crate::{ kafka::KafkaStatisticsContext, sinks::kafka::{request_builder::KafkaRequestBuilder, service::KafkaService}, @@ -46,7 +45,7 @@ pub(crate) fn create_producer( impl KafkaSink { pub(crate) fn new(config: KafkaSinkConfig) -> crate::Result { - let producer_config = config.to_rdkafka(KafkaRole::Producer)?; + let producer_config = config.to_rdkafka()?; let producer = create_producer(producer_config)?; let transformer = config.encoding.transformer(); let serializer = config.encoding.build()?; @@ -105,7 +104,7 @@ impl KafkaSink { pub(crate) async fn healthcheck(config: KafkaSinkConfig) -> crate::Result<()> { trace!("Healthcheck started."); - let client = config.to_rdkafka(KafkaRole::Consumer).unwrap(); + let client_config = config.to_rdkafka().unwrap(); let topic: Option = match config.healthcheck_topic { Some(topic) => Some(topic), _ => match config.topic.render_string(&LogEvent::from_str_legacy("")) { @@ -121,10 +120,11 @@ pub(crate) async fn healthcheck(config: KafkaSinkConfig) -> crate::Result<()> { }; tokio::task::spawn_blocking(move || { - let consumer: BaseConsumer = client.create().unwrap(); + let producer: BaseProducer = client_config.create().unwrap(); let topic = topic.as_ref().map(|topic| &topic[..]); - consumer + producer + .client() .fetch_metadata(topic, Duration::from_secs(3)) .map(|_| ()) }) diff --git a/src/sinks/kafka/tests.rs b/src/sinks/kafka/tests.rs index 6d36e77fd6fd2..2c327f09d42f9 100644 --- a/src/sinks/kafka/tests.rs +++ b/src/sinks/kafka/tests.rs @@ -19,11 +19,7 @@ mod integration_test { event::{BatchNotifier, BatchStatus}, }; - use super::super::{ - config::{KafkaRole, KafkaSinkConfig}, - sink::KafkaSink, - *, - }; + use super::super::{config::KafkaSinkConfig, sink::KafkaSink, *}; use crate::{ event::{ObjectMap, Value}, kafka::{KafkaAuthConfig, KafkaCompression, KafkaSaslConfig}, @@ -190,8 +186,7 @@ mod integration_test { headers_key: None, acknowledgements: Default::default(), }; - config.clone().to_rdkafka(KafkaRole::Consumer)?; - config.clone().to_rdkafka(KafkaRole::Producer)?; + config.clone().to_rdkafka()?; self::sink::healthcheck(config.clone()).await?; KafkaSink::new(config) }