Skip to content

Commit

Permalink
Fix issues revealed in test setup; Add fork info to README
Browse files Browse the repository at this point in the history
  • Loading branch information
madebydna committed Dec 2, 2024
1 parent 8c7ac80 commit ed87e48
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 5 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ source 'https://rubygems.org'
# Specify your gem's dependencies in fluent-plugin-kafka.gemspec
gemspec

gem 'json', '2.7.3' # override of 2.7.4 version
gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA']
41 changes: 40 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,45 @@ If you want to use zookeeper related parameters, you also need to install zookee

## Usage

### :exclamation: In this fork: MSK IAM Authentication Support for `rdkafka2` Output Type

This fork adds support for using MSK IAM authentication with the `rdkafka2` output type in Fluentd. Authentication and authorization with an MSK cluster are facilitated through a base64-encoded signed URL, which is generated by the [aws-msk-iam-sasl-signer-ruby](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby) library.

The `aws-msk-iam-sasl-signer-ruby` library provides an [example](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby/tree/main/examples/rdkafka) for generating the OAuthBearer token using rdkafka, which is one of the core Kafka libraries supported by the Fluentd fluent-plugin-kafka plugin. This fork integrates that example into the `Fluent::Rdkafka2Output` class, enabling AWS IAM authentication.

The key change is the inclusion of a refresh callback:
```ruby
Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token)
```
This callback triggers token generation when needed, ensuring continuous authentication with the MSK cluster.

#### Configuration Example
To enable this feature, configure your Fluentd input as follows:

```
<match *>
@type rdkafka2
# Kafka brokers to connect to (typically port 9098 or 9198 for IAM authentication)
brokers <broker_addresses>
# Topic to write events to
topic_key test-topic-1
default_topic test-topic-1
# AWS Region (required)
aws_msk_region us-east-1
# Use a shared producer for the connection (required)
share_producer true
# MSK IAM authentication settings (required)
rdkafka_options {
"security.protocol": "sasl_ssl",
"sasl.mechanisms": "OAUTHBEARER"
}
</match>
```
With this configuration, Fluentd will handle the token refresh and manage the connection to your MSK cluster using AWS IAM authentication.

### Common parameters

#### SSL authentication
Expand Down Expand Up @@ -563,7 +602,7 @@ You need to install rdkafka gem.
</match>

`rdkafka2` supports `discard_kafka_delivery_failed_regex` parameter:
- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`.
- `discard_kafka_delivery_failed_regex` - default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as `/unknown_topic/`.

If you use v0.12, use `rdkafka` instead.

Expand Down
1 change: 0 additions & 1 deletion fluent-plugin-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ Gem::Specification.new do |gem|
gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2'
gem.add_dependency 'rdkafka'
gem.add_dependency 'aws-msk-iam-sasl-signer'
gem.add_dependency 'json', '2.7.3'
gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
gem.add_development_dependency "test-unit-rr", "~> 1.0"
Expand Down
16 changes: 13 additions & 3 deletions lib/fluent/plugin/out_rdkafka2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'fluent/plugin/kafka_plugin_util'

require 'rdkafka'
require 'aws_msk_iam_sasl_signer'

begin
rdkafka_version = Gem::Version::create(Rdkafka::VERSION)
Expand Down Expand Up @@ -307,8 +308,14 @@ def build_config
end

def refresh_token(_config, _client_name)
print "refreshing token\n"
log.info("+--- Refreshing token")
client = get_producer
# This will happen once upon initialization and is expected to fail, as the producer isnt set yet
# We will set the token manually after creation and after that this refresh method will work
unless client
log.info("Could not get shared client handle, unable to set/refresh token (this is expected one time on startup)")
return
end
signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: @aws_msk_region)
token = signer.generate_auth_token

Expand All @@ -325,18 +332,21 @@ def refresh_token(_config, _client_name)
end
end

# HERE -----------------
def start
if @share_producer
@shared_producer = @rdkafka.producer
log.info("Created shared producer")
if @aws_msk_region
refresh_token(nil, nil)
log.info("Set initial token for shared producer")
end
else
@producers = {}
@producers_mutex = Mutex.new
end

super
end
# HERE -----------------

def multi_workers_ready?
true
Expand Down

0 comments on commit ed87e48

Please sign in to comment.