Skip to content

Commit

Permalink
Merge pull request #501 from dipendra-singh/rdkafka_idempotency
Browse files Browse the repository at this point in the history
out_rdkafka2: enable idempotency for kafka producers
  • Loading branch information
ashie authored Jul 31, 2024
2 parents b6b50a0 + a223581 commit 2dece8b
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst
discard_kafka_delivery_failed (bool) :default => false (No discard)
partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
share_producer (bool) :default => false
idempotent (bool) :default => false

# If you intend to rely on AWS IAM auth to MSK with long lived credentials
# https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/out_rdkafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Fluent::Rdkafka2Output < Output
config_param :default_message_key, :string, :default => nil
config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition"
config_param :default_partition, :integer, :default => nil
config_param :idempotent, :bool, :default => false, :desc => 'Enable idempotent producer'
config_param :output_data_type, :string, :default => 'json', :obsoleted => "Use <format> section instead"
config_param :output_include_tag, :bool, :default => false, :obsoleted => "Use <inject> section instead"
config_param :output_include_time, :bool, :default => false, :obsoleted => "Use <inject> section instead"
Expand Down Expand Up @@ -286,6 +287,7 @@ def build_config
config[:"batch.num.messages"] = @rdkafka_message_max_num if @rdkafka_message_max_num
config[:"sasl.username"] = @username if @username
config[:"sasl.password"] = @password if @password
config[:"enable.idempotence"] = @idempotent if @idempotent

@rdkafka_options.each { |k, v|
config[k.to_sym] = v
Expand Down

0 comments on commit 2dece8b

Please sign in to comment.