From 0cc18d5aba26c4339a322a86c010133d8d7d99a6 Mon Sep 17 00:00:00 2001 From: vas78 Date: Mon, 28 Nov 2016 17:13:02 +0100 Subject: [PATCH] Added certificate and CA support in TLS config --- config.go | 9 ++++++--- kafka_client.go | 22 +++++++++++++++++++++- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/config.go b/config.go index 8c71cad5..76d8d24f 100644 --- a/config.go +++ b/config.go @@ -25,9 +25,12 @@ import ( // Configuration definition type ClientProfile struct { - ClientID string `gcfg:"client-id"` - TLS bool `gcfg:"tls"` - TLSNoVerify bool `gcfg:"tls-noverify"` + ClientID string `gcfg:"client-id"` + TLS bool `gcfg:"tls"` + TLSNoVerify bool `gcfg:"tls-noverify"` + TLSCertFilePath string `gcfg:"tls-certfilepath"` + TLSKeyFilePath string `gcfg:"tls-keyfilepath"` + TLSCAFilePath string `gcfg:"tls-cafilepath"` } type BurrowConfig struct { General struct { diff --git a/kafka_client.go b/kafka_client.go index 63c89e92..ec02b23a 100644 --- a/kafka_client.go +++ b/kafka_client.go @@ -13,6 +13,8 @@ package main import ( "bytes" "crypto/tls" + "crypto/x509" + "io/ioutil" "encoding/binary" "errors" "github.com/Shopify/sarama" @@ -49,7 +51,25 @@ func NewKafkaClient(app *ApplicationContext, cluster string) (*KafkaClient, erro profile := app.Config.Clientprofile[app.Config.Kafka[cluster].Clientprofile] clientConfig.ClientID = profile.ClientID clientConfig.Net.TLS.Enable = profile.TLS - clientConfig.Net.TLS.Config = &tls.Config{} + if profile.TLSCertFilePath != "" { + caCert, err := ioutil.ReadFile(profile.TLSCAFilePath) + if err != nil { + return nil, err + } + cert, err := tls.LoadX509KeyPair(profile.TLSCertFilePath, profile.TLSKeyFilePath) + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + clientConfig.Net.TLS.Config = &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + clientConfig.Net.TLS.Config.BuildNameToCertificate() + } else { + clientConfig.Net.TLS.Config = &tls.Config{} + } clientConfig.Net.TLS.Config.InsecureSkipVerify = profile.TLSNoVerify sclient, err := sarama.NewClient(app.Config.Kafka[cluster].Brokers, clientConfig)