Skip to content

Commit

Permalink
Merge pull request linkedin#160 from vas78/master
Browse files Browse the repository at this point in the history
Add extended support for TLS
  • Loading branch information
toddpalino authored Mar 2, 2017
2 parents 2adac06 + a5e3464 commit dce021a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
9 changes: 6 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import (

// Configuration definition
type ClientProfile struct {
ClientID string `gcfg:"client-id"`
TLS bool `gcfg:"tls"`
TLSNoVerify bool `gcfg:"tls-noverify"`
ClientID string `gcfg:"client-id"`
TLS bool `gcfg:"tls"`
TLSNoVerify bool `gcfg:"tls-noverify"`
TLSCertFilePath string `gcfg:"tls-certfilepath"`
TLSKeyFilePath string `gcfg:"tls-keyfilepath"`
TLSCAFilePath string `gcfg:"tls-cafilepath"`
}
type BurrowConfig struct {
General struct {
Expand Down
22 changes: 21 additions & 1 deletion kafka_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ package main
import (
"bytes"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"encoding/binary"
"errors"
"github.com/Shopify/sarama"
Expand Down Expand Up @@ -49,7 +51,25 @@ func NewKafkaClient(app *ApplicationContext, cluster string) (*KafkaClient, erro
profile := app.Config.Clientprofile[app.Config.Kafka[cluster].Clientprofile]
clientConfig.ClientID = profile.ClientID
clientConfig.Net.TLS.Enable = profile.TLS
clientConfig.Net.TLS.Config = &tls.Config{}
if profile.TLSCertFilePath == "" || profile.TLSKeyFilePath == "" || profile.TLSCAFilePath == "" {
clientConfig.Net.TLS.Config = &tls.Config{}
} else {
caCert, err := ioutil.ReadFile(profile.TLSCAFilePath)
if err != nil {
return nil, err
}
cert, err := tls.LoadX509KeyPair(profile.TLSCertFilePath, profile.TLSKeyFilePath)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
clientConfig.Net.TLS.Config = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
clientConfig.Net.TLS.Config.BuildNameToCertificate()
}
clientConfig.Net.TLS.Config.InsecureSkipVerify = profile.TLSNoVerify

sclient, err := sarama.NewClient(app.Config.Kafka[cluster].Brokers, clientConfig)
Expand Down

0 comments on commit dce021a

Please sign in to comment.