Skip to content

Commit

Permalink
out_rdkafka2: add patch for new version of rdkafka
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Tych <[email protected]>
Signed-off-by: Kentaro Hayashi <[email protected]>
  • Loading branch information
kenhys committed Jul 31, 2024
1 parent d4e1964 commit fde1f2a
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 45 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ You need to install rdkafka gem.
partition_key (string) :default => 'partition'
partition_key_key (string) :default => 'partition_key'
message_key_key (string) :default => 'message_key'
default_topic (string) :default => nil
use_default_for_unknown_topic (bool) :default => false
use_default_for_unknown_partition_error (bool) :default => false
default_partition_key (string) :default => nil
Expand Down
54 changes: 10 additions & 44 deletions lib/fluent/plugin/out_rdkafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,17 @@

require 'rdkafka'

# This is required for `rdkafka` version >= 0.12.0
# Overriding the close method in order to provide a time limit for when it should be forcibly closed
class Rdkafka::Producer::Client
# return false if producer is forcefully closed, otherwise return true
def close(timeout=nil)
return unless @native

# Indicate to polling thread that we're closing
@polling_thread[:closing] = true
# Wait for the polling thread to finish up
thread = @polling_thread.join(timeout)

Rdkafka::Bindings.rd_kafka_destroy(@native)

@native = nil

return !thread.nil?
end
end

class Rdkafka::Producer
# return false if producer is forcefully closed, otherwise return true
def close(timeout = nil)
rdkafka_version = Rdkafka::VERSION || '0.0.0'
# Rdkafka version >= 0.12.0 changed its internals
# but reverted in >= 0.13.0
gem_version = Gem::Version::create(rdkafka_version)
if gem_version >= Gem::Version.create('0.12.0') and
gem_version <= Gem::Version.create('0.12.1')
ObjectSpace.undefine_finalizer(self)

return @client.close(timeout)
end

@closing = true
# Wait for the polling thread to finish up
# If the broker isn't alive, the thread doesn't exit
if timeout
thr = @polling_thread.join(timeout)
return !!thr
else
@polling_thread.join
return true
end
begin
rdkafka_version = Gem::Version::create(Rdkafka::VERSION)
if rdkafka_version < Gem::Version.create('0.12.0')
require_relative 'rdkafka_patch/0_11_0'
elsif rdkafka_version == Gem::Version.create('0.12.0')
require_relative 'rdkafka_patch/0_12_0'
elsif rdkafka_version >= Gem::Version.create('0.14.0')
require_relative 'rdkafka_patch/0_14_0'
end
rescue LoadError, NameError
raise "unable to patch rdkafka."
end

module Fluent::Plugin
Expand Down
15 changes: 15 additions & 0 deletions lib/fluent/plugin/rdkafka_patch/0_11_0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class Rdkafka::Producer
# return false if producer is forcefully closed, otherwise return true
def close(timeout = nil)
@closing = true
# Wait for the polling thread to finish up
# If the broker isn't alive, the thread doesn't exit
if timeout
thr = @polling_thread.join(timeout)
return !!thr
else
@polling_thread.join
return true
end
end
end
27 changes: 27 additions & 0 deletions lib/fluent/plugin/rdkafka_patch/0_12_0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# This is required for `rdkafka` version >= 0.12.0
# Overriding the close method in order to provide a time limit for when it should be forcibly closed
class Rdkafka::Producer::Client
# return false if producer is forcefully closed, otherwise return true
def close(timeout=nil)
return unless @native

# Indicate to polling thread that we're closing
@polling_thread[:closing] = true
# Wait for the polling thread to finish up
thread = @polling_thread.join(timeout)

Rdkafka::Bindings.rd_kafka_destroy(@native)

@native = nil

return !thread.nil?
end
end

class Rdkafka::Producer
def close(timeout = nil)
ObjectSpace.undefine_finalizer(self)

return @client.close(timeout)
end
end
44 changes: 44 additions & 0 deletions lib/fluent/plugin/rdkafka_patch/0_14_0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
class Rdkafka::NativeKafka
# return false if producer is forcefully closed, otherwise return true
def close(timeout=nil, object_id=nil)
return true if closed?

synchronize do
# Indicate to the outside world that we are closing
@closing = true

thread_status = :unknown
if @polling_thread
# Indicate to polling thread that we're closing
@polling_thread[:closing] = true

# Wait for the polling thread to finish up,
# this can be aborted in practice if this
# code runs from a finalizer.
thread_status = @polling_thread.join(timeout)
end

# Destroy the client after locking both mutexes
@poll_mutex.lock

# This check prevents a race condition, where we would enter the close in two threads
# and after unlocking the primary one that hold the lock but finished, ours would be unlocked
# and would continue to run, trying to destroy inner twice
if @inner
Rdkafka::Bindings.rd_kafka_destroy(@inner)
@inner = nil
@opaque = nil
end

!thread_status.nil?
end
end
end

class Rdkafka::Producer
def close(timeout = nil)
return true if closed?
ObjectSpace.undefine_finalizer(self)
@native_kafka.close(timeout)
end
end

0 comments on commit fde1f2a

Please sign in to comment.