diff --git a/config.go b/config.go index 8c71cad5..76d8d24f 100644 --- a/config.go +++ b/config.go @@ -25,9 +25,12 @@ import ( // Configuration definition type ClientProfile struct { - ClientID string `gcfg:"client-id"` - TLS bool `gcfg:"tls"` - TLSNoVerify bool `gcfg:"tls-noverify"` + ClientID string `gcfg:"client-id"` + TLS bool `gcfg:"tls"` + TLSNoVerify bool `gcfg:"tls-noverify"` + TLSCertFilePath string `gcfg:"tls-certfilepath"` + TLSKeyFilePath string `gcfg:"tls-keyfilepath"` + TLSCAFilePath string `gcfg:"tls-cafilepath"` } type BurrowConfig struct { General struct { diff --git a/kafka_client.go b/kafka_client.go index 63c89e92..73655fa7 100644 --- a/kafka_client.go +++ b/kafka_client.go @@ -13,6 +13,8 @@ package main import ( "bytes" "crypto/tls" + "crypto/x509" + "io/ioutil" "encoding/binary" "errors" "github.com/Shopify/sarama" @@ -49,7 +51,25 @@ func NewKafkaClient(app *ApplicationContext, cluster string) (*KafkaClient, erro profile := app.Config.Clientprofile[app.Config.Kafka[cluster].Clientprofile] clientConfig.ClientID = profile.ClientID clientConfig.Net.TLS.Enable = profile.TLS - clientConfig.Net.TLS.Config = &tls.Config{} + if profile.TLSCertFilePath == "" || profile.TLSKeyFilePath == "" || profile.TLSCAFilePath == "" { + clientConfig.Net.TLS.Config = &tls.Config{} + } else { + caCert, err := ioutil.ReadFile(profile.TLSCAFilePath) + if err != nil { + return nil, err + } + cert, err := tls.LoadX509KeyPair(profile.TLSCertFilePath, profile.TLSKeyFilePath) + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + clientConfig.Net.TLS.Config = &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + clientConfig.Net.TLS.Config.BuildNameToCertificate() + } clientConfig.Net.TLS.Config.InsecureSkipVerify = profile.TLSNoVerify sclient, err := sarama.NewClient(app.Config.Kafka[cluster].Brokers, clientConfig)