Skip to content

Commit

Permalink
Merge pull request #839 from davidFR/master
Browse files Browse the repository at this point in the history
Fix #838 : Kafka in TLS (not only mutual TLS)
  • Loading branch information
Urban Ishimwe authored Nov 2, 2020
2 parents 358cb78 + f719071 commit 20435af
Showing 1 changed file with 24 additions and 14 deletions.
38 changes: 24 additions & 14 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"log"
Expand Down Expand Up @@ -50,34 +51,43 @@ type KafkaMessage struct {
ReqHeaders map[string]string `json:"Req_Headers,omitempty"`
}


// NewTLSConfig loads TLS certificates
func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
tlsConfig := tls.Config{}

if clientCertFile != "" && clientKeyFile == "" {
return &tlsConfig, errors.New("Missing key of client certificate in kafka")
}
if clientCertFile == "" && clientKeyFile != "" {
return &tlsConfig, errors.New("missing TLS client certificate in kafka")
}
// Load client cert
cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
if err != nil {
return &tlsConfig, err
if (clientCertFile != "") && (clientKeyFile != "") {
cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
if err != nil {
return &tlsConfig, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
tlsConfig.Certificates = []tls.Certificate{cert}

// Load CA cert
caCert, err := ioutil.ReadFile(caCertFile)
if err != nil {
return &tlsConfig, err
if caCertFile != "" {
caCert, err := ioutil.ReadFile(caCertFile)
if err != nil {
return &tlsConfig, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool

return &tlsConfig, err
return &tlsConfig, nil
}

// NewKafkaConfig returns Kafka config with or without TLS
func NewKafkaConfig(tlsConfig *KafkaTLSConfig) *sarama.Config {
config := sarama.NewConfig()
// Configuration options go here
if (tlsConfig != nil) && (tlsConfig.CACert != "") && (tlsConfig.ClientCert != "") && (tlsConfig.ClientKey != "") {
if tlsConfig != nil && (tlsConfig.ClientCert != "" || tlsConfig.CACert != "") {
config.Net.TLS.Enable = true
tlsConfig, err := NewTLSConfig(tlsConfig.ClientCert, tlsConfig.ClientKey, tlsConfig.CACert)
if err != nil {
Expand Down

0 comments on commit 20435af

Please sign in to comment.