Skip to content

Commit

Permalink
SASL and TLS support
Browse files Browse the repository at this point in the history
  • Loading branch information
xitonix committed Aug 6, 2019
1 parent e95d060 commit a9dc69c
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 3 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/pkg/errors v0.8.1
github.com/pkg/profile v1.3.0
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/xitonix/flags v0.1.1
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962/go.mod h1:bCqn
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xitonix/flags v0.1.0 h1:/GFhmS5JiMRuKmyLt5Jv8LUaTQ1UXNvhILJ132/y/yY=
github.com/xitonix/flags v0.1.0/go.mod h1:u2wBVLaMlqa31kAZB4eqmq7jFX6SmMR8QBDMB3cP92A=
Expand Down
6 changes: 5 additions & 1 deletion internal/verbosity_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const (
VeryVerbose
// SuperVerbose super verbose mode (-vvv)
SuperVerbose
// Chatty extremely verbose mode (-vvvv)
Chatty
)

// ToVerbosityLevel converts an integer to verbosity level.
Expand All @@ -21,8 +23,10 @@ func ToVerbosityLevel(counter int) VerbosityLevel {
return Verbose
case counter == 2:
return VeryVerbose
case counter >= 3:
case counter == 3:
return SuperVerbose
case counter >= 4:
return Chatty
default:
return Forced
}
Expand Down
21 changes: 20 additions & 1 deletion kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package kafka

import (
"context"
"log"
"math"
"os"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -41,9 +43,13 @@ func NewConsumer(brokers []string, printer internal.Printer, environment string,
option(ops)
}

if printer.Level() == internal.Chatty {
sarama.Logger = log.New(os.Stdout, "KAFKA Client: ", log.LstdFlags)
}

client, consumer, err := initClient(brokers, ops)
if err != nil {
return nil, errors.Wrap(err, "Failed to initialise kafka client")
return nil, err
}

return &Consumer{
Expand Down Expand Up @@ -343,6 +349,19 @@ func initClient(brokers []string, ops *Options) (sarama.Client, sarama.Consumer,
config := sarama.NewConfig()
config.Version = version
config.Consumer.Return.Errors = true
config.ClientID = "Trubka"
if ops.sasl != nil {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = ops.sasl.mechanism
config.Net.SASL.User = ops.sasl.username
config.Net.SASL.Password = ops.sasl.password
config.Net.SASL.SCRAMClientGeneratorFunc = ops.sasl.client
}

if ops.TLS != nil {
config.Net.TLS.Enable = true
config.Net.TLS.Config = ops.TLS
}

client, err := sarama.NewClient(brokers, config)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions kafka/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kafka

import (
"crypto/tls"

"github.com/Shopify/sarama"

"github.com/xitonix/trubka/internal"
Expand All @@ -18,6 +20,9 @@ type Options struct {
ClusterVersion string
// OffsetStore the type responsible to store consumer offsets
OffsetStore OffsetStore
// TLS configuration to connect to Kafka cluster.
TLS *tls.Config
sasl *sasl
}

// NewOptions creates a new Options object with default values.
Expand Down Expand Up @@ -47,3 +52,17 @@ func WithOffsetStore(store OffsetStore) Option {
options.OffsetStore = store
}
}

// WithSASL enables SASL authentication.
func WithSASL(mechanism, username, password string) Option {
return func(options *Options) {
options.sasl = newSASL(mechanism, username, password)
}
}

// WithTLS enables TLS.
func WithTLS(tls *tls.Config) Option {
return func(options *Options) {
options.TLS = tls
}
}
54 changes: 54 additions & 0 deletions kafka/sasl_authentication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package kafka

import (
"crypto/sha256"
"crypto/sha512"
"hash"
"strings"

"github.com/Shopify/sarama"
)

const (
SASLMechanismNone = "none"
SASLMechanismPlain = "plain"
SASLMechanismSCRAM256 = "scram-sha-256"
SASLMechanismSCRAM512 = "scram-sha-512"
)

type sasl struct {
mechanism sarama.SASLMechanism
username string
password string
client func() sarama.SCRAMClient
}

// This will return nil if the mechanism is not valid.
func newSASL(mechanism, username, password string) *sasl {
switch strings.ToLower(mechanism) {
case SASLMechanismPlain:
return &sasl{
mechanism: sarama.SASLTypePlaintext,
username: username,
password: password,
}
case SASLMechanismSCRAM256:
hash := func() hash.Hash { return sha256.New() }
return &sasl{
client: func() sarama.SCRAMClient { return &xdgSCRAMClient{HashGeneratorFcn: hash} },
mechanism: sarama.SASLTypeSCRAMSHA256,
username: username,
password: password,
}
case SASLMechanismSCRAM512:
hash := func() hash.Hash { return sha512.New() }
return &sasl{
client: func() sarama.SCRAMClient { return &xdgSCRAMClient{HashGeneratorFcn: hash} },
mechanism: sarama.SASLTypeSCRAMSHA512,
username: username,
password: password,
}
default:
return nil
}
}
29 changes: 29 additions & 0 deletions kafka/scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package kafka

import (
"github.com/xdg/scram"
)

type xdgSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *xdgSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *xdgSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *xdgSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
43 changes: 42 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -62,6 +64,16 @@ func main() {
includeTimeStamp := flags.Bool("include-timestamp", "Prints the message timestamp before the content if it's been provided by Kafka.").WithShort("T")
enableAutoTopicCreation := flags.Bool("auto-topic-creation", `Enables automatic Kafka topic creation before consuming (if it is allowed on the server).
Enabling this option in production is not recommended since it may pollute the environment with unwanted topics.`)
saslMechanism := flags.String("sasl-mechanism", "SASL authentication mechanism.").
WithValidRange(true, kafka.SASLMechanismNone, kafka.SASLMechanismPlain, kafka.SASLMechanismSCRAM256, kafka.SASLMechanismSCRAM512).
WithDefault(kafka.SASLMechanismNone)

saslUsername := flags.String("sasl-username", "SASL authentication username. Will be ignored if --sasl-mechanism is set to none.").WithShort("U")
saslPassword := flags.String("sasl-password", "SASL authentication password. Will be ignored if --sasl-mechanism is set to none.").WithShort("P")

enableTLS := flags.Bool("tls", "Enables TLS for communicating with the Kafka cluster.")
certCA := flags.String("tls-ca", "An optional certificate authority file for TLS client authentication.")

v := flags.Verbosity("The verbosity level of the tool.").WithKey("-")
version := flags.Bool("version", "Prints the current version of Trubka.").WithKey("-")

Expand Down Expand Up @@ -110,11 +122,21 @@ func main() {
exit(err)
}

var tlsConfig *tls.Config
if enableTLS.Get() {
tlsConfig, err = configureTLS(certCA.Get())
if err != nil {
exit(err)
}
}

consumer, err := kafka.NewConsumer(
brokers.Get(), prn,
environment.Get(),
enableAutoTopicCreation.Get(),
kafka.WithClusterVersion(kafkaVersion.Get()))
kafka.WithClusterVersion(kafkaVersion.Get()),
kafka.WithTLS(tlsConfig),
kafka.WithSASL(saslMechanism.Get(), saslUsername.Get(), saslPassword.Get()))

if err != nil {
exit(err)
Expand Down Expand Up @@ -189,6 +211,25 @@ func main() {
}
}

func configureTLS(caFilePath string) (*tls.Config, error) {
caFilePath = strings.TrimSpace(caFilePath)
if len(caFilePath) == 0 {
return &tls.Config{
InsecureSkipVerify: true,
}, nil
}
caCert, err := ioutil.ReadFile(caFilePath)
if err != nil {
return nil, errors.Wrap(err, "Failed to load the certificate authority file")
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
return &tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: false,
}, nil
}

func getCheckpoint(rewind bool, timeCheckpoint *core.TimeFlag, offsetCheckpoint *core.Int64Flag) *kafka.Checkpoint {
cp := kafka.NewCheckpoint(rewind)
switch {
Expand Down

0 comments on commit a9dc69c

Please sign in to comment.