Skip to content

Commit

Permalink
Merge pull request #481 from raytung/feat/aws-iam-auth
Browse files Browse the repository at this point in the history
feat(out_kafka2): adds support for AWS IAM authentication to MSK usin…
  • Loading branch information
ashie authored Jan 23, 2023
2 parents f1c55d6 + c7fce94 commit 2253efd
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 1 deletion.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst
partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
share_producer (bool) :default => false

# If you intend to rely on AWS IAM auth to MSK with long lived credentials
# https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html
#
# For AWS STS support, see status in
# - https://github.com/zendesk/ruby-kafka/issues/944
# - https://github.com/zendesk/ruby-kafka/pull/951
sasl_aws_msk_iam_access_key_id (string) :default => nil
sasl_aws_msk_iam_secret_key_id (string) :default => nil
sasl_aws_msk_iam_aws_region (string) :default => nil

<format>
@type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
</format>
Expand Down
2 changes: 1 addition & 1 deletion fluent-plugin-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Gem::Specification.new do |gem|

gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
gem.add_dependency 'ltsv'
gem.add_dependency 'ruby-kafka', '>= 1.4.0', '< 2'
gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2'
gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
gem.add_development_dependency "test-unit-rr", "~> 1.0"
Expand Down
13 changes: 13 additions & 0 deletions lib/fluent/plugin/kafka_plugin_util.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
module Fluent
module KafkaPluginUtil
module AwsIamSettings
def self.included(klass)
klass.instance_eval do
config_param :sasl_aws_msk_iam_access_key_id, :string, :default => nil, secret: true,
desc: "AWS access key Id for IAM authentication to MSK."
config_param :sasl_aws_msk_iam_secret_key_id, :string, :default => nil, secret: true,
desc: "AWS access key secret for IAM authentication to MSK."
config_param :sasl_aws_msk_iam_aws_region, :string, :default => nil,
desc: "AWS region for IAM authentication to MSK."
end
end
end

module SSLSettings
def self.included(klass)
klass.instance_eval {
Expand Down
22 changes: 22 additions & 0 deletions lib/fluent/plugin/out_kafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class Fluent::Kafka2Output < Output
config_set_default :@type, 'json'
end

include Fluent::KafkaPluginUtil::AwsIamSettings
include Fluent::KafkaPluginUtil::SSLSettings
include Fluent::KafkaPluginUtil::SaslSettings

Expand All @@ -113,6 +114,7 @@ def initialize
def refresh_client(raise_error = true)
begin
logger = @get_kafka_client_log ? log : nil
use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
if @scram_mechanism != nil && @username != nil && @password != nil
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
Expand All @@ -125,6 +127,26 @@ def refresh_client(raise_error = true)
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
elsif use_long_lived_aws_credentials
@kafka = Kafka.new(
seed_brokers: @seed_brokers,
client_id: @client_id,
logger: logger,
connect_timeout: @connect_timeout,
socket_timeout: @socket_timeout,
ssl_ca_cert_file_path: @ssl_ca_cert,
ssl_client_cert: read_ssl_file(@ssl_client_cert),
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
sasl_over_ssl: @sasl_over_ssl,
ssl_verify_hostname: @ssl_verify_hostname,
resolve_seed_brokers: @resolve_seed_brokers,
sasl_aws_msk_iam_access_key_id: @sasl_aws_msk_iam_access_key_id,
sasl_aws_msk_iam_secret_key_id: @sasl_aws_msk_iam_secret_key_id,
sasl_aws_msk_iam_aws_region: @sasl_aws_msk_iam_aws_region,
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function)
)
else
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
Expand Down

0 comments on commit 2253efd

Please sign in to comment.