Skip to content

Commit

Permalink
Added certificate and CA support in TLS config
Browse files Browse the repository at this point in the history
  • Loading branch information
vas78 committed Nov 28, 2016
1 parent 1e4b0d8 commit 0cc18d5
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
9 changes: 6 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import (

// Configuration definition
type ClientProfile struct {
ClientID string `gcfg:"client-id"`
TLS bool `gcfg:"tls"`
TLSNoVerify bool `gcfg:"tls-noverify"`
ClientID string `gcfg:"client-id"`
TLS bool `gcfg:"tls"`
TLSNoVerify bool `gcfg:"tls-noverify"`
TLSCertFilePath string `gcfg:"tls-certfilepath"`
TLSKeyFilePath string `gcfg:"tls-keyfilepath"`
TLSCAFilePath string `gcfg:"tls-cafilepath"`
}
type BurrowConfig struct {
General struct {
Expand Down
22 changes: 21 additions & 1 deletion kafka_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ package main
import (
"bytes"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"encoding/binary"
"errors"
"github.com/Shopify/sarama"
Expand Down Expand Up @@ -49,7 +51,25 @@ func NewKafkaClient(app *ApplicationContext, cluster string) (*KafkaClient, erro
profile := app.Config.Clientprofile[app.Config.Kafka[cluster].Clientprofile]
clientConfig.ClientID = profile.ClientID
clientConfig.Net.TLS.Enable = profile.TLS
clientConfig.Net.TLS.Config = &tls.Config{}
if profile.TLSCertFilePath != "" {
caCert, err := ioutil.ReadFile(profile.TLSCAFilePath)
if err != nil {
return nil, err
}
cert, err := tls.LoadX509KeyPair(profile.TLSCertFilePath, profile.TLSKeyFilePath)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
clientConfig.Net.TLS.Config = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
clientConfig.Net.TLS.Config.BuildNameToCertificate()
} else {
clientConfig.Net.TLS.Config = &tls.Config{}
}
clientConfig.Net.TLS.Config.InsecureSkipVerify = profile.TLSNoVerify

sclient, err := sarama.NewClient(app.Config.Kafka[cluster].Brokers, clientConfig)
Expand Down

0 comments on commit 0cc18d5

Please sign in to comment.