Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ cmd/query/query
cmd/query/query-linux
crossdock/crossdock-linux
run-crossdock.log
.vscode/
101 changes: 100 additions & 1 deletion pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package producer

import (
"crypto/tls"
"fmt"

"github.com/Shopify/sarama"
)

Expand All @@ -25,12 +28,108 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka producer
type Configuration struct {
Brokers []string
Brokers []string
Authenticators []Authenticator
Metadata bool
Version string
}

// NewProducer creates a new asynchronous kafka producer
func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
var err error
// Last write wins on conflict
for _, auth := range c.Authenticators {
saramaConfig, err = auth.Authenticate(saramaConfig)
if err != nil {
return nil, err
}
}
saramaConfig.Metadata.Full = c.Metadata
if c.Version == "" {
saramaConfig.Version = sarama.MinVersion
} else {
v, err := sarama.ParseKafkaVersion(c.Version)
if err != nil {
return nil, err
}
saramaConfig.Version = v
}
return sarama.NewAsyncProducer(c.Brokers, saramaConfig)
}

// AuthConfiguration ...
type AuthConfiguration interface{}

// SASLAuthConfiguration ...
type SASLAuthConfiguration struct {
Username string
Password string
}

// TLSAuthConfiguration ...
type TLSAuthConfiguration struct {
Enabled bool
Config *tls.Config
}

// Authenticator ...
type Authenticator interface {
Authenticate(config *sarama.Config) (*sarama.Config, error)
}

// SASLAuthenticator ...
type SASLAuthenticator struct {
config *SASLAuthConfiguration
}

// NewSASLAuthenticator ...
func NewSASLAuthenticator(auth AuthConfiguration) (*SASLAuthenticator, error) {
c, ok := auth.(SASLAuthConfiguration)
if !ok {
return nil, fmt.Errorf("cannot type assert AuthConfiguration into SASLAuthConfiguration")
}
return &SASLAuthenticator{
config: &c,
}, nil
}

// Authenticate ...
func (s *SASLAuthenticator) Authenticate(config *sarama.Config) (*sarama.Config, error) {
if s.config == nil || config == nil {
return nil, fmt.Errorf("error")
}
config.Net.SASL.Enable = true
config.Net.SASL.User = s.config.Username
config.Net.SASL.Password = s.config.Password
return config, nil
}

// TLSAuthenticator ...
type TLSAuthenticator struct {
config *TLSAuthConfiguration
}

// NewTLSAuthenticator ...
func NewTLSAuthenticator(auth AuthConfiguration) (*TLSAuthenticator, error) {
c, ok := auth.(TLSAuthConfiguration)
if !ok {
return nil, fmt.Errorf("cannot type assert AuthConfiguration into TLSAuthConfiguration")
}
return &TLSAuthenticator{
config: &c,
}, nil
}

// Authenticate ...
func (t *TLSAuthenticator) Authenticate(config *sarama.Config) (*sarama.Config, error) {
if t.config == nil || config == nil {
return nil, fmt.Errorf("error")
}
config.Net.TLS.Enable = t.config.Enabled
if t.config.Config != nil {
config.Net.TLS.Config = t.config.Config
}
return config, nil
}
74 changes: 69 additions & 5 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@ import (
)

const (
configPrefix = "kafka"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"
configPrefix = "kafka"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"
suffixSaslEnabled = ".sasl.enabled"
suffixSaslUsername = ".sasl.username"
suffixSaslPassword = ".sasl.password"
suffixMetadata = ".metadata"
suffixTLSEnabled = ".tls.enabled"
//TODO Add required TLS config
suffixVersion = ".version"

encodingJSON = "json"
encodingProto = "protobuf"
Expand Down Expand Up @@ -60,12 +67,69 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
defaultEncoding,
fmt.Sprintf(`Encoding of spans ("%s" or "%s") sent to kafka.`, encodingProto, encodingJSON),
)
flagSet.Bool(
configPrefix+suffixSaslEnabled,
false,
fmt.Sprintf("Enable SASL configuration"),
)
flagSet.String(
configPrefix+suffixSaslUsername,
"",
fmt.Sprintf("SASL username"),
)
flagSet.String(
configPrefix+suffixSaslPassword,
"",
fmt.Sprintf("SASL password"),
)
flagSet.Bool(
configPrefix+suffixTLSEnabled,
false,
fmt.Sprintf("Enable TLS configuration"),
)
}

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
auths := make([]producer.Authenticator, 0)
saslEnabled := v.GetBool(configPrefix + suffixSaslEnabled)
if saslEnabled {
authConfig := producer.SASLAuthConfiguration{
Username: v.GetString(configPrefix + suffixSaslUsername),
Password: v.GetString(configPrefix + suffixSaslPassword),
}
auth, err := producer.NewSASLAuthenticator(authConfig)
if err != nil {
panic(fmt.Sprintf("cannot initialize new SASL authenticator: %+v", err))
}
auths = append(auths, auth)
}
tlsEnabled := v.GetBool(configPrefix + suffixTLSEnabled)
if tlsEnabled {
//TODO Build full TLS configuration
authConfig := producer.TLSAuthConfiguration{
Enabled: true,
}
auth, err := producer.NewTLSAuthenticator(authConfig)
if err != nil {
panic(fmt.Sprintf("cannot initialize new TLS authenticator: %+v", err))
}
auths = append(auths, auth)
}
fullMetadata := true
metadata := v.Get(configPrefix + suffixMetadata)
if metadata != nil {
if m, ok := metadata.(bool); !ok {
panic(fmt.Sprintf("config value %s%s must be a bool (true/false)", configPrefix, suffixMetadata))
} else {
fullMetadata = m
}
}
opt.config = producer.Configuration{
Brokers: strings.Split(v.GetString(configPrefix+suffixBrokers), ","),
Brokers: strings.Split(v.GetString(configPrefix+suffixBrokers), ","),
Authenticators: auths,
Metadata: fullMetadata,
Version: v.GetString(configPrefix + suffixVersion),
}
opt.topic = v.GetString(configPrefix + suffixTopic)
opt.encoding = v.GetString(configPrefix + suffixEncoding)
Expand Down