Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_rdkafka2: add patch for new version of rdkafka #505

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ jobs:
- ubuntu-latest
rdkafka_versions:
- { min: '>= 0.6.0', max: '< 0.12.0' }
- { min: '>= 0.12.0', max: '>= 0.12.0' }
- { min: '>= 0.12.0', max: '< 0.14.0' }
- { min: '>= 0.14.0', max: '< 0.16.0' }
- { min: '>= 0.16.0', max: '>= 0.16.0' }
bundler_version:
- '2.5.16'
# rdkafka 0.15.2 is the last version which supports Ruby 2.7
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ You need to install rdkafka gem.
partition_key (string) :default => 'partition'
partition_key_key (string) :default => 'partition_key'
message_key_key (string) :default => 'message_key'
default_topic (string) :default => nil
kenhys marked this conversation as resolved.
Show resolved Hide resolved
use_default_for_unknown_topic (bool) :default => false
use_default_for_unknown_partition_error (bool) :default => false
default_partition_key (string) :default => nil
Expand Down
53 changes: 12 additions & 41 deletions lib/fluent/plugin/out_rdkafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,19 @@

require 'rdkafka'

# This is required for `rdkafka` version >= 0.12.0
# Overriding the close method in order to provide a time limit for when it should be forcibly closed
class Rdkafka::Producer::Client
# return false if producer is forcefully closed, otherwise return true
def close(timeout=nil)
return unless @native

# Indicate to polling thread that we're closing
@polling_thread[:closing] = true
# Wait for the polling thread to finish up
thread = @polling_thread.join(timeout)

Rdkafka::Bindings.rd_kafka_destroy(@native)

@native = nil

return !thread.nil?
end
end

class Rdkafka::Producer
# return false if producer is forcefully closed, otherwise return true
def close(timeout = nil)
rdkafka_version = Rdkafka::VERSION || '0.0.0'
# Rdkafka version >= 0.12.0 changed its internals
if Gem::Version::create(rdkafka_version) >= Gem::Version.create('0.12.0')
ObjectSpace.undefine_finalizer(self)

return @client.close(timeout)
end

@closing = true
# Wait for the polling thread to finish up
# If the broker isn't alive, the thread doesn't exit
if timeout
thr = @polling_thread.join(timeout)
return !!thr
else
@polling_thread.join
return true
end
begin
rdkafka_version = Gem::Version::create(Rdkafka::VERSION)
if rdkafka_version < Gem::Version.create('0.12.0')
require_relative 'rdkafka_patch/0_11_0'
elsif rdkafka_version == Gem::Version.create('0.12.0')
require_relative 'rdkafka_patch/0_12_0'
elsif rdkafka_version >= Gem::Version.create('0.14.0')
require_relative 'rdkafka_patch/0_14_0'
elsif rdkafka_version >= Gem::Version.create('0.16.0')
require_relative 'rdkafka_patch/0_16_0'
end
rescue LoadError, NameError
raise "unable to patch rdkafka."
end

module Fluent::Plugin
Expand Down
15 changes: 15 additions & 0 deletions lib/fluent/plugin/rdkafka_patch/0_11_0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class Rdkafka::Producer
# return false if producer is forcefully closed, otherwise return true
def close(timeout = nil)
@closing = true
# Wait for the polling thread to finish up
# If the broker isn't alive, the thread doesn't exit
if timeout
thr = @polling_thread.join(timeout)
return !!thr
else
@polling_thread.join
return true
end
end
end
27 changes: 27 additions & 0 deletions lib/fluent/plugin/rdkafka_patch/0_12_0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# This is required for `rdkafka` version >= 0.12.0
# Overriding the close method in order to provide a time limit for when it should be forcibly closed
class Rdkafka::Producer::Client
# return false if producer is forcefully closed, otherwise return true
def close(timeout=nil)
return unless @native

# Indicate to polling thread that we're closing
@polling_thread[:closing] = true
# Wait for the polling thread to finish up
thread = @polling_thread.join(timeout)

Rdkafka::Bindings.rd_kafka_destroy(@native)

@native = nil

return !thread.nil?
end
end

class Rdkafka::Producer
def close(timeout = nil)
ObjectSpace.undefine_finalizer(self)

return @client.close(timeout)
end
end
44 changes: 44 additions & 0 deletions lib/fluent/plugin/rdkafka_patch/0_14_0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
class Rdkafka::NativeKafka
# return false if producer is forcefully closed, otherwise return true
def close(timeout=nil, object_id=nil)
return true if closed?

synchronize do
# Indicate to the outside world that we are closing
@closing = true

thread_status = :unknown
if @polling_thread
# Indicate to polling thread that we're closing
@polling_thread[:closing] = true

# Wait for the polling thread to finish up,
# this can be aborted in practice if this
# code runs from a finalizer.
thread_status = @polling_thread.join(timeout)
end

# Destroy the client after locking both mutexes
@poll_mutex.lock

# This check prevents a race condition, where we would enter the close in two threads
# and after unlocking the primary one that hold the lock but finished, ours would be unlocked
# and would continue to run, trying to destroy inner twice
if @inner
Rdkafka::Bindings.rd_kafka_destroy(@inner)
@inner = nil
@opaque = nil
end

!thread_status.nil?
end
end
end

class Rdkafka::Producer
def close(timeout = nil)
return true if closed?
ObjectSpace.undefine_finalizer(self)
@native_kafka.close(timeout)
end
end
kenhys marked this conversation as resolved.
Show resolved Hide resolved
55 changes: 55 additions & 0 deletions lib/fluent/plugin/rdkafka_patch/0_16_0.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
class Rdkafka::NativeKafka
# return false if producer is forcefully closed, otherwise return true
def close(timeout=nil, object_id=nil)
return true if closed?

synchronize do
# Indicate to the outside world that we are closing
@closing = true

thread_status = :unknown
if @polling_thread
# Indicate to polling thread that we're closing
@polling_thread[:closing] = true

# Wait for the polling thread to finish up,
# this can be aborted in practice if this
# code runs from a finalizer.
thread_status = @polling_thread.join(timeout)
end

# Destroy the client after locking both mutexes
@poll_mutex.lock

# This check prevents a race condition, where we would enter the close in two threads
# and after unlocking the primary one that hold the lock but finished, ours would be unlocked
# and would continue to run, trying to destroy inner twice
retun unless @inner

Rdkafka::Bindings.rd_kafka_destroy(@inner)
@inner = nil
@opaque = nil

!thread_status.nil?
end
end
end

class Rdkafka::Producer
def close(timeout = nil)
return true if closed?
ObjectSpace.undefine_finalizer(self)

@native_kafka.close(timeout) do
# We need to remove the topics references objects before we destroy the producer,
# otherwise they would leak out
@topics_refs_map.each_value do |refs|
refs.each_value do |ref|
Rdkafka::Bindings.rd_kafka_topic_destroy(ref)
end
end
end

@topics_refs_map.clear
end
end