diff --git a/kafka.go b/kafka.go index 1c7254312..677234587 100644 --- a/kafka.go +++ b/kafka.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/tls" "crypto/x509" + "errors" "fmt" "io/ioutil" "log" @@ -50,34 +51,43 @@ type KafkaMessage struct { ReqHeaders map[string]string `json:"Req_Headers,omitempty"` } + // NewTLSConfig loads TLS certificates func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) { tlsConfig := tls.Config{} + if clientCertFile != "" && clientKeyFile == "" { + return &tlsConfig, errors.New("Missing key of client certificate in kafka") + } + if clientCertFile == "" && clientKeyFile != "" { + return &tlsConfig, errors.New("missing TLS client certificate in kafka") + } // Load client cert - cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile) - if err != nil { - return &tlsConfig, err + if (clientCertFile != "") && (clientKeyFile != "") { + cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile) + if err != nil { + return &tlsConfig, err + } + tlsConfig.Certificates = []tls.Certificate{cert} } - tlsConfig.Certificates = []tls.Certificate{cert} - // Load CA cert - caCert, err := ioutil.ReadFile(caCertFile) - if err != nil { - return &tlsConfig, err + if caCertFile != "" { + caCert, err := ioutil.ReadFile(caCertFile) + if err != nil { + return &tlsConfig, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + tlsConfig.RootCAs = caCertPool } - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(caCert) - tlsConfig.RootCAs = caCertPool - - return &tlsConfig, err + return &tlsConfig, nil } // NewKafkaConfig returns Kafka config with or without TLS func NewKafkaConfig(tlsConfig *KafkaTLSConfig) *sarama.Config { config := sarama.NewConfig() // Configuration options go here - if (tlsConfig != nil) && (tlsConfig.CACert != "") && (tlsConfig.ClientCert != "") && (tlsConfig.ClientKey != "") { + if tlsConfig != nil && (tlsConfig.ClientCert != "" || tlsConfig.CACert != "") { config.Net.TLS.Enable = true tlsConfig, err := NewTLSConfig(tlsConfig.ClientCert, tlsConfig.ClientKey, tlsConfig.CACert) if err != nil {