diff --git a/go.mod b/go.mod index fd2f057..1d1bc4a 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/pkg/errors v0.8.1 github.com/pkg/profile v1.3.0 github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 // indirect + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/xitonix/flags v0.1.1 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect diff --git a/go.sum b/go.sum index 023a127..f95040f 100644 --- a/go.sum +++ b/go.sum @@ -61,7 +61,9 @@ github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962/go.mod h1:bCqn github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xitonix/flags v0.1.0 h1:/GFhmS5JiMRuKmyLt5Jv8LUaTQ1UXNvhILJ132/y/yY= github.com/xitonix/flags v0.1.0/go.mod h1:u2wBVLaMlqa31kAZB4eqmq7jFX6SmMR8QBDMB3cP92A= diff --git a/internal/verbosity_level.go b/internal/verbosity_level.go index 3d94dcd..c36bced 100644 --- a/internal/verbosity_level.go +++ b/internal/verbosity_level.go @@ -12,6 +12,8 @@ const ( VeryVerbose // SuperVerbose super verbose mode (-vvv) SuperVerbose + // Chatty extremely verbose mode (-vvvv) + Chatty ) // ToVerbosityLevel converts an integer to verbosity level. @@ -21,8 +23,10 @@ func ToVerbosityLevel(counter int) VerbosityLevel { return Verbose case counter == 2: return VeryVerbose - case counter >= 3: + case counter == 3: return SuperVerbose + case counter >= 4: + return Chatty default: return Forced } diff --git a/kafka/consumer.go b/kafka/consumer.go index 770a47a..322e77a 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -2,7 +2,9 @@ package kafka import ( "context" + "log" "math" + "os" "regexp" "strings" "sync" @@ -41,9 +43,13 @@ func NewConsumer(brokers []string, printer internal.Printer, environment string, option(ops) } + if printer.Level() == internal.Chatty { + sarama.Logger = log.New(os.Stdout, "KAFKA Client: ", log.LstdFlags) + } + client, consumer, err := initClient(brokers, ops) if err != nil { - return nil, errors.Wrap(err, "Failed to initialise kafka client") + return nil, err } return &Consumer{ @@ -343,6 +349,19 @@ func initClient(brokers []string, ops *Options) (sarama.Client, sarama.Consumer, config := sarama.NewConfig() config.Version = version config.Consumer.Return.Errors = true + config.ClientID = "Trubka" + if ops.sasl != nil { + config.Net.SASL.Enable = true + config.Net.SASL.Mechanism = ops.sasl.mechanism + config.Net.SASL.User = ops.sasl.username + config.Net.SASL.Password = ops.sasl.password + config.Net.SASL.SCRAMClientGeneratorFunc = ops.sasl.client + } + + if ops.TLS != nil { + config.Net.TLS.Enable = true + config.Net.TLS.Config = ops.TLS + } client, err := sarama.NewClient(brokers, config) if err != nil { diff --git a/kafka/options.go b/kafka/options.go index ee40d14..78c6df6 100644 --- a/kafka/options.go +++ b/kafka/options.go @@ -1,6 +1,8 @@ package kafka import ( + "crypto/tls" + "github.com/Shopify/sarama" "github.com/xitonix/trubka/internal" @@ -18,6 +20,9 @@ type Options struct { ClusterVersion string // OffsetStore the type responsible to store consumer offsets OffsetStore OffsetStore + // TLS configuration to connect to Kafka cluster. + TLS *tls.Config + sasl *sasl } // NewOptions creates a new Options object with default values. @@ -47,3 +52,17 @@ func WithOffsetStore(store OffsetStore) Option { options.OffsetStore = store } } + +// WithSASL enables SASL authentication. +func WithSASL(mechanism, username, password string) Option { + return func(options *Options) { + options.sasl = newSASL(mechanism, username, password) + } +} + +// WithTLS enables TLS. +func WithTLS(tls *tls.Config) Option { + return func(options *Options) { + options.TLS = tls + } +} diff --git a/kafka/sasl_authentication.go b/kafka/sasl_authentication.go new file mode 100644 index 0000000..5e504d4 --- /dev/null +++ b/kafka/sasl_authentication.go @@ -0,0 +1,54 @@ +package kafka + +import ( + "crypto/sha256" + "crypto/sha512" + "hash" + "strings" + + "github.com/Shopify/sarama" +) + +const ( + SASLMechanismNone = "none" + SASLMechanismPlain = "plain" + SASLMechanismSCRAM256 = "scram-sha-256" + SASLMechanismSCRAM512 = "scram-sha-512" +) + +type sasl struct { + mechanism sarama.SASLMechanism + username string + password string + client func() sarama.SCRAMClient +} + +// This will return nil if the mechanism is not valid. +func newSASL(mechanism, username, password string) *sasl { + switch strings.ToLower(mechanism) { + case SASLMechanismPlain: + return &sasl{ + mechanism: sarama.SASLTypePlaintext, + username: username, + password: password, + } + case SASLMechanismSCRAM256: + hash := func() hash.Hash { return sha256.New() } + return &sasl{ + client: func() sarama.SCRAMClient { return &xdgSCRAMClient{HashGeneratorFcn: hash} }, + mechanism: sarama.SASLTypeSCRAMSHA256, + username: username, + password: password, + } + case SASLMechanismSCRAM512: + hash := func() hash.Hash { return sha512.New() } + return &sasl{ + client: func() sarama.SCRAMClient { return &xdgSCRAMClient{HashGeneratorFcn: hash} }, + mechanism: sarama.SASLTypeSCRAMSHA512, + username: username, + password: password, + } + default: + return nil + } +} diff --git a/kafka/scram_client.go b/kafka/scram_client.go new file mode 100644 index 0000000..5ff0ba8 --- /dev/null +++ b/kafka/scram_client.go @@ -0,0 +1,29 @@ +package kafka + +import ( + "github.com/xdg/scram" +) + +type xdgSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *xdgSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *xdgSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *xdgSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} diff --git a/main.go b/main.go index 60505d2..f7adfee 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,8 @@ package main import ( "context" + "crypto/tls" + "crypto/x509" "fmt" "io" "io/ioutil" @@ -62,6 +64,16 @@ func main() { includeTimeStamp := flags.Bool("include-timestamp", "Prints the message timestamp before the content if it's been provided by Kafka.").WithShort("T") enableAutoTopicCreation := flags.Bool("auto-topic-creation", `Enables automatic Kafka topic creation before consuming (if it is allowed on the server). Enabling this option in production is not recommended since it may pollute the environment with unwanted topics.`) + saslMechanism := flags.String("sasl-mechanism", "SASL authentication mechanism."). + WithValidRange(true, kafka.SASLMechanismNone, kafka.SASLMechanismPlain, kafka.SASLMechanismSCRAM256, kafka.SASLMechanismSCRAM512). + WithDefault(kafka.SASLMechanismNone) + + saslUsername := flags.String("sasl-username", "SASL authentication username. Will be ignored if --sasl-mechanism is set to none.").WithShort("U") + saslPassword := flags.String("sasl-password", "SASL authentication password. Will be ignored if --sasl-mechanism is set to none.").WithShort("P") + + enableTLS := flags.Bool("tls", "Enables TLS for communicating with the Kafka cluster.") + certCA := flags.String("tls-ca", "An optional certificate authority file for TLS client authentication.") + v := flags.Verbosity("The verbosity level of the tool.").WithKey("-") version := flags.Bool("version", "Prints the current version of Trubka.").WithKey("-") @@ -110,11 +122,21 @@ func main() { exit(err) } + var tlsConfig *tls.Config + if enableTLS.Get() { + tlsConfig, err = configureTLS(certCA.Get()) + if err != nil { + exit(err) + } + } + consumer, err := kafka.NewConsumer( brokers.Get(), prn, environment.Get(), enableAutoTopicCreation.Get(), - kafka.WithClusterVersion(kafkaVersion.Get())) + kafka.WithClusterVersion(kafkaVersion.Get()), + kafka.WithTLS(tlsConfig), + kafka.WithSASL(saslMechanism.Get(), saslUsername.Get(), saslPassword.Get())) if err != nil { exit(err) @@ -189,6 +211,25 @@ func main() { } } +func configureTLS(caFilePath string) (*tls.Config, error) { + caFilePath = strings.TrimSpace(caFilePath) + if len(caFilePath) == 0 { + return &tls.Config{ + InsecureSkipVerify: true, + }, nil + } + caCert, err := ioutil.ReadFile(caFilePath) + if err != nil { + return nil, errors.Wrap(err, "Failed to load the certificate authority file") + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + return &tls.Config{ + RootCAs: caCertPool, + InsecureSkipVerify: false, + }, nil +} + func getCheckpoint(rewind bool, timeCheckpoint *core.TimeFlag, offsetCheckpoint *core.Int64Flag) *kafka.Checkpoint { cp := kafka.NewCheckpoint(rewind) switch {