From b0f3a10f5bbd548195092878ef25d2b6737d4feb Mon Sep 17 00:00:00 2001 From: Dipendra Singh Date: Fri, 1 Dec 2023 13:55:18 -0800 Subject: [PATCH] out_kafka2: Broker pool to take care of fetching metadata Signed-off-by: Dipendra Singh --- lib/fluent/plugin/kafka_producer_ext.rb | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/kafka_producer_ext.rb b/lib/fluent/plugin/kafka_producer_ext.rb index c58fccb..e1b70a2 100644 --- a/lib/fluent/plugin/kafka_producer_ext.rb +++ b/lib/fluent/plugin/kafka_producer_ext.rb @@ -93,6 +93,10 @@ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compresso @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor @partitioner = partitioner + + # The set of topics that are produced to. + @target_topics = Set.new + # A buffer organized by topic/partition. @buffer = MessageBuffer.new @@ -116,7 +120,8 @@ def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_ if @transaction_manager.transactional? && !@transaction_manager.in_transaction? raise 'You must trigger begin_transaction before producing messages' end - + + @target_topics.add(topic) @pending_message_queue.write(message) nil @@ -187,7 +192,7 @@ def transaction def deliver_messages_with_retries attempt = 0 - #@cluster.add_target_topics(@target_topics) + @cluster.add_target_topics(@target_topics) operation = ProduceOperation.new( cluster: @cluster,