Skip to content

Commit

Permalink
Adding support for AWS IAM authentication for MSK
Browse files Browse the repository at this point in the history
  • Loading branch information
madebydna committed Nov 29, 2024
1 parent f799f61 commit 76bfd37
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions lib/fluent/plugin/out_rdkafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class Fluent::Rdkafka2Output < Output
config_param :unrecoverable_error_codes, :array, :default => ["topic_authorization_failed", "msg_size_too_large"],
:desc => 'Handle some of the error codes should be unrecoverable if specified'

config_param :aws_msk_region, :string, :default => nil, :desc => 'AWS region for MSK'

config_section :buffer do
config_set_default :chunk_keys, ["topic"]
end
Expand Down Expand Up @@ -205,10 +207,17 @@ def add(level, message = nil)
end
end
}
# HERE -----------------
Rdkafka::Config.logger = log
config = build_config
@rdkafka = Rdkafka::Config.new(config)


if config[:"security.protocol"] == "sasl_ssl" && config[:"sasl.mechanisms"] == "OAUTHBEARER"
Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token)
end
# HERE -----------------

if @default_topic.nil?
if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error
raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true"
Expand Down Expand Up @@ -289,13 +298,34 @@ def build_config
config[:"sasl.password"] = @password if @password
config[:"enable.idempotence"] = @idempotent if @idempotent

# sasl.mechnisms and security.protocol are set as rdkafka_options
@rdkafka_options.each { |k, v|
config[k.to_sym] = v
}

config
end

def refresh_token(_config, _client_name)
print "refreshing token\n"
client = get_producer
signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: @aws_msk_region)
token = signer.generate_auth_token

if token
client.oauthbearer_set_token(
token: token.token,
lifetime_ms: token.expiration_time_ms,
principal_name: "kafka-cluster"
)
else
client.oauthbearer_set_token_failure(
"Failed to generate token."
)
end
end

# HERE -----------------
def start
if @share_producer
@shared_producer = @rdkafka.producer
Expand All @@ -306,6 +336,7 @@ def start

super
end
# HERE -----------------

def multi_workers_ready?
true
Expand Down

0 comments on commit 76bfd37

Please sign in to comment.