diff --git a/Rakefile b/Rakefile index 28435cb..a5a232b 100755 --- a/Rakefile +++ b/Rakefile @@ -5,7 +5,10 @@ require 'rake/testtask' Rake::TestTask.new(:test) do |test| test.libs << 'lib' << 'test' + + # TODO: include the waterdrop tests after fixing up CI test.test_files = FileList['test/**/test_*.rb'] + .exclude('test/**/test_out_waterdrop.rb') test.verbose = true end diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..96d4a9b --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,8 @@ +version: "3" +services: + kafka: + image: confluentinc/confluent-local:7.4.3 + ports: + - "9092:9092" + + diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index c99cc27..f6d6d15 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -19,6 +19,7 @@ Gem::Specification.new do |gem| gem.add_dependency "fluentd", [">= 0.10.58", "< 2"] gem.add_dependency 'ltsv' gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2' + gem.add_dependency 'waterdrop', '~> 2.6' gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", "~> 1.0" diff --git a/lib/fluent/plugin/out_waterdrop.rb b/lib/fluent/plugin/out_waterdrop.rb new file mode 100644 index 0000000..149a103 --- /dev/null +++ b/lib/fluent/plugin/out_waterdrop.rb @@ -0,0 +1,91 @@ +require 'thread' +require 'logger' +require 'fluent/plugin/output' +require 'fluent/plugin/kafka_plugin_util' +require 'waterdrop' + +module Fluent::Plugin + class Fluent::WaterdropOutput < Output + Fluent::Plugin.register_output('waterdrop', self) + helpers :inject, :formatter, :record_accessor + + config_param :bootstrap_servers, :string, default: 'localhost:9092', + desc: <<-DESC +Set bootstrap servers directly: +:,:,.. + DESC + + config_param :default_topic, :string, default: nil, desc: <<-DESC +Default output topic when record doesn't have topic field + DESC + + config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic" + + config_section :buffer do + config_set_default :chunk_keys, ["topic"] + end + + config_section :format do + config_set_default :@type, 'json' + config_set_default :add_newline, false + end + + def initialize + super + + config = { + 'bootstrap.servers': @bootstrap_servers + } + + @producer = WaterDrop::Producer.new do |conf| + conf.deliver = true + conf.kafka = config + end + + @formatter_proc = nil + @topic_key_sym = @topic_key.to_sym + end + + def configure(conf) + super + + formatter_conf = conf.elements('format').first + unless formatter_conf + raise Fluent::ConfigError, " section is required." + end + unless formatter_conf["@type"] + raise Fluent::ConfigError, "format/@type is required." + end + + @formatter_proc = setup_formatter(formatter_conf) + end + + def setup_formatter(conf) + @formatter = formatter_create(usage: 'waterdrop-plugin', conf: conf) + @formatter.method(:format) + end + + def write(chunk) + tag = chunk.metadata.tag + topic = if @topic + extract_placeholders(@topic, chunk) + else + (chunk.metadata.variables && chunk.metadata.variables[@topic_key_sym]) || @default_topic || tag + end + begin + chunk.msgpack_each do |time, record| + record_buf = @formatter_proc.call(tag, time, record) + @producer.buffer(topic: topic, payload: record_buf) + end + + @producer.flush_sync + end + end + + def shutdown + super + + @producer.close + end + end +end \ No newline at end of file diff --git a/test/plugin/test_out_waterdrop.rb b/test/plugin/test_out_waterdrop.rb new file mode 100644 index 0000000..e90d21e --- /dev/null +++ b/test/plugin/test_out_waterdrop.rb @@ -0,0 +1,110 @@ +require 'fluent/test' +require 'fluent/test/helpers' +require 'fluent/test/driver/output' +require 'fluent/plugin/out_waterdrop' +require 'rdkafka' +require 'json' + +# 1. run docker-compose to spin up the Kafka broker +# 2. Run these tests +class WaterdropOutputTest < Test::Unit::TestCase + include Fluent::Test::Helpers + + def setup + Fluent::Test.setup + end + + def create_driver(conf, tag = 'test') + Fluent::Test::Driver::Output.new(Fluent::WaterdropOutput).configure(conf) + end + + sub_test_case 'configure' do + test 'basic configuration' do + assert_nothing_raised(Fluent::ConfigError) do + config = %[ + @type waterdrop + + @type json + + ] + driver = create_driver(config) + + assert_equal 'localhost:9092', driver.instance.bootstrap_servers + end + end + + test 'missing format section' do + assert_raise(Fluent::ConfigError) do + config = %[ + @type waterdrop + ] + create_driver(config) + end + end + + test 'formatter section missing @type' do + assert_raise(Fluent::ConfigError) do + config = %[ + @type waterdrop + + literally 'anything else' + + ] + create_driver(config) + end + end + end + + sub_test_case 'produce' do + GLOBAL_CONFIG = { + "bootstrap.servers" => "localhost:9092", + "topic.metadata.propagation.max.ms" => 11 * 1_000, + "topic.metadata.refresh.interval.ms" => 10 * 1_000, + } + TOPIC = 'produce.basic-produce' + + def setup + @kafka_admin = Rdkafka::Config.new(GLOBAL_CONFIG).admin + @kafka_consumer = Rdkafka::Config.new( + GLOBAL_CONFIG.merge( + { + "group.id" => "waterdrop", + "auto.offset.reset" => "earliest", + } + ) + ).consumer + + @kafka_admin.delete_topic(TOPIC) + @kafka_admin.create_topic(TOPIC, 1, 1) + .wait(max_wait_timeout: 30) + @kafka_consumer.subscribe(TOPIC) + end + + def teardown + @kafka_consumer.close + @kafka_admin.delete_topic(TOPIC) + @kafka_admin.close + end + + test 'basic produce' do + config = %[ + @type waterdrop + default_topic #{TOPIC} + + @type json + + ] + d = create_driver(config) + d.run(default_tag: TOPIC, flush: true) do + d.feed(Fluent::EventTime.now, { topic: TOPIC, body: '123' }) + end + + sleep(12) + + raw_message = @kafka_consumer.poll(5_000) + + message = JSON.parse!(raw_message.payload) + assert_equal '123', message["body"] + end + end +end \ No newline at end of file