diff --git a/README.md b/README.md index ae336c2..e4ad29d 100644 --- a/README.md +++ b/README.md @@ -517,6 +517,7 @@ You need to install rdkafka gem. exclude_topic_key (bool) :default => false exclude_partition_key (bool) :default => false discard_kafka_delivery_failed (bool) :default => false (No discard) + discard_kafka_delivery_failed_regex (regexp) :default => nil (No discard) use_event_time (bool) :default => false # same with kafka2 @@ -559,6 +560,9 @@ You need to install rdkafka gem. max_enqueue_bytes_per_second (integer) :default => nil +`rdkafka2` supports `discard_kafka_delivery_failed_regex` parameter: +- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`. + If you use v0.12, use `rdkafka` instead. diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index 044efe6..0fc18e7 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -112,6 +112,7 @@ class Fluent::Rdkafka2Output < Output config_param :use_event_time, :bool, :default => false, :desc => 'Use fluentd event time for rdkafka timestamp' config_param :max_send_limit_bytes, :size, :default => nil config_param :discard_kafka_delivery_failed, :bool, :default => false + config_param :discard_kafka_delivery_failed_regex, :regexp, :default => nil config_param :rdkafka_buffering_max_ms, :integer, :default => nil, :desc => 'Used for queue.buffering.max.ms' config_param :rdkafka_buffering_max_messages, :integer, :default => nil, :desc => 'Used for queue.buffering.max.messages' config_param :rdkafka_message_max_bytes, :integer, :default => nil, :desc => 'Used for message.max.bytes' @@ -461,9 +462,13 @@ def write(chunk) if @discard_kafka_delivery_failed log.warn "Delivery failed. Discard events:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag else - log.warn "Send exception occurred: #{e} at #{e.backtrace.first}" - # Raise exception to retry sendind messages - raise e + if @discard_kafka_delivery_failed_regex != nil && @discard_kafka_delivery_failed_regex.match?(e.to_s) + log.warn "Delivery failed and matched regexp pattern #{@discard_kafka_delivery_failed_regex}. Discard events:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag + else + log.warn "Send exception occurred: #{e} at #{e.backtrace.first}" + # Raise exception to retry sendind messages + raise e + end end ensure @writing_threads_mutex.synchronize { @writing_threads.delete(Thread.current) }