diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index b306cfb..2038d90 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -101,6 +101,8 @@ class Fluent::Rdkafka2Output < Output config_param :unrecoverable_error_codes, :array, :default => ["topic_authorization_failed", "msg_size_too_large"], :desc => 'Handle some of the error codes should be unrecoverable if specified' + config_param :aws_msk_region, :string, :default => nil, :desc => 'AWS region for MSK' + config_section :buffer do config_set_default :chunk_keys, ["topic"] end @@ -205,10 +207,17 @@ def add(level, message = nil) end end } + # HERE ----------------- Rdkafka::Config.logger = log config = build_config @rdkafka = Rdkafka::Config.new(config) + + if config[:"security.protocol"] == "sasl_ssl" && config[:"sasl.mechanisms"] == "OAUTHBEARER" + Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token) + end + # HERE ----------------- + if @default_topic.nil? if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true" @@ -289,6 +298,7 @@ def build_config config[:"sasl.password"] = @password if @password config[:"enable.idempotence"] = @idempotent if @idempotent + # sasl.mechnisms and security.protocol are set as rdkafka_options @rdkafka_options.each { |k, v| config[k.to_sym] = v } @@ -296,6 +306,26 @@ def build_config config end + def refresh_token(_config, _client_name) + print "refreshing token\n" + client = get_producer + signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: @aws_msk_region) + token = signer.generate_auth_token + + if token + client.oauthbearer_set_token( + token: token.token, + lifetime_ms: token.expiration_time_ms, + principal_name: "kafka-cluster" + ) + else + client.oauthbearer_set_token_failure( + "Failed to generate token." + ) + end + end + + # HERE ----------------- def start if @share_producer @shared_producer = @rdkafka.producer @@ -306,6 +336,7 @@ def start super end + # HERE ----------------- def multi_workers_ready? true