From 8b785820f45b3775c9d21973ef0a5f03110cab7a Mon Sep 17 00:00:00 2001 From: Kentaro Hayashi Date: Wed, 31 Jul 2024 12:30:29 +0900 Subject: [PATCH] out_rdkafka2: add patch for v0.16.0 or later Signed-off-by: Kentaro Hayashi --- lib/fluent/plugin/out_rdkafka2.rb | 2 + lib/fluent/plugin/rdkafka_patch/0_16_0.rb | 55 +++++++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 lib/fluent/plugin/rdkafka_patch/0_16_0.rb diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index 7be64cb..15da9a1 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -13,6 +13,8 @@ require_relative 'rdkafka_patch/0_12_0' elsif rdkafka_version >= Gem::Version.create('0.14.0') require_relative 'rdkafka_patch/0_14_0' + elsif rdkafka_version >= Gem::Version.create('0.16.0') + require_relative 'rdkafka_patch/0_16_0' end rescue LoadError, NameError raise "unable to patch rdkafka." diff --git a/lib/fluent/plugin/rdkafka_patch/0_16_0.rb b/lib/fluent/plugin/rdkafka_patch/0_16_0.rb new file mode 100644 index 0000000..80b8114 --- /dev/null +++ b/lib/fluent/plugin/rdkafka_patch/0_16_0.rb @@ -0,0 +1,55 @@ +class Rdkafka::NativeKafka + # return false if producer is forcefully closed, otherwise return true + def close(timeout=nil, object_id=nil) + return true if closed? + + synchronize do + # Indicate to the outside world that we are closing + @closing = true + + thread_status = :unknown + if @polling_thread + # Indicate to polling thread that we're closing + @polling_thread[:closing] = true + + # Wait for the polling thread to finish up, + # this can be aborted in practice if this + # code runs from a finalizer. + thread_status = @polling_thread.join(timeout) + end + + # Destroy the client after locking both mutexes + @poll_mutex.lock + + # This check prevents a race condition, where we would enter the close in two threads + # and after unlocking the primary one that hold the lock but finished, ours would be unlocked + # and would continue to run, trying to destroy inner twice + retun unless @inner + + Rdkafka::Bindings.rd_kafka_destroy(@inner) + @inner = nil + @opaque = nil + + !thread_status.nil? + end + end +end + +class Rdkafka::Producer + def close(timeout = nil) + return true if closed? + ObjectSpace.undefine_finalizer(self) + + @native_kafka.close(timeout) do + # We need to remove the topics references objects before we destroy the producer, + # otherwise they would leak out + @topics_refs_map.each_value do |refs| + refs.each_value do |ref| + Rdkafka::Bindings.rd_kafka_topic_destroy(ref) + end + end + end + + @topics_refs_map.clear + end +end