diff --git a/check/health_check.go b/check/health_check.go index 04e023f..ebc141a 100644 --- a/check/health_check.go +++ b/check/health_check.go @@ -1,6 +1,7 @@ package check import ( + "io/ioutil" "math/rand" "time" @@ -34,6 +35,9 @@ type HealthCheckConfig struct { brokerHost string brokerID uint brokerPort uint + brokerCACertPath string + brokerClientCertPath string + brokerClientKeyPath string zookeeperConnect string statusServerPort uint } @@ -112,17 +116,42 @@ func (check *HealthCheck) CheckHealth(brokerUpdates chan<- Update, clusterUpdate func newUpdate(report StatusReport, name string) Update { data, err := report.Json() if err != nil { - log.Warn("Error while marshaling %s status: %s", name, err.Error()) + log.Warnf("Error while marshaling %s status: %s", name, err.Error()) data = simpleStatus(report.Summary()) } return Update{report.Summary(), data} } -func (check *HealthCheck) brokerConfig() kafka.BrokerConf { +func (check *HealthCheck) brokerConfig() (kafka.BrokerConf, error) { config := kafka.NewBrokerConf("health-check-client") config.DialRetryLimit = 1 config.DialRetryWait = check.config.CheckTimeout - return config + + if check.config.brokerCACertPath != "" && + check.config.brokerClientCertPath != "" && + check.config.brokerClientKeyPath != "" { + + ca, err := ioutil.ReadFile(check.config.brokerCACertPath) + if err != nil { + return kafka.BrokerConf{}, err + } + + cert, err := ioutil.ReadFile(check.config.brokerClientCertPath) + if err != nil { + return kafka.BrokerConf{}, err + } + + key, err := ioutil.ReadFile(check.config.brokerClientKeyPath) + if err != nil { + return kafka.BrokerConf{}, err + } + + config.TLSCa = ca + config.TLSCert = cert + config.TLSKey = key + } + + return config, nil } func (check *HealthCheck) consumerConfig() kafka.ConsumerConf { diff --git a/check/parse_args.go b/check/parse_args.go index 0805dea..1d7a90e 100644 --- a/check/parse_args.go +++ b/check/parse_args.go @@ -16,6 +16,9 @@ func (check *HealthCheck) ParseCommandLineArguments() { flag.StringVar(&check.config.brokerHost, "broker-host", "localhost", "ip address or hostname of broker host") flag.UintVar(&check.config.brokerID, "broker-id", 0, "id of the Kafka broker to health check") flag.UintVar(&check.config.brokerPort, "broker-port", 9092, "Kafka broker port") + flag.StringVar(&check.config.brokerCACertPath, "broker-ca-cert", "", "Kafka broker TLS CA certificate path") + flag.StringVar(&check.config.brokerClientCertPath, "broker-client-cert", "", "Kafka broker TLS client certificate path") + flag.StringVar(&check.config.brokerClientKeyPath, "broker-client-key", "", "Kafka broker TLS client key path") flag.UintVar(&check.config.statusServerPort, "server-port", 8000, "port to open for http health status queries") flag.StringVar(&check.config.zookeeperConnect, "zookeeper", "", "ZooKeeper connect string (e.g. node1:2181,node2:2181,.../chroot)") flag.StringVar(&check.config.topicName, "topic", "", "name of the topic to use - use one per broker, defaults to broker--health-check") diff --git a/check/setup.go b/check/setup.go index 1ac9e81..1c482f0 100644 --- a/check/setup.go +++ b/check/setup.go @@ -32,7 +32,13 @@ func (check *HealthCheck) tryConnectOnce(createBrokerTopic, createReplicationTop pauseTime := check.config.retryInterval // connect to kafka cluster connectString := []string{fmt.Sprintf("%s:%d", check.config.brokerHost, check.config.brokerPort)} - err := check.broker.Dial(connectString, check.brokerConfig()) + + brokerConfig, err := check.brokerConfig() + if err != nil { + + } + + err = check.broker.Dial(connectString, brokerConfig) if err != nil { log.Printf("unable to connect to broker, retrying in %s (%s)", pauseTime.String(), err) return err diff --git a/vendor/vendor.json b/vendor/vendor.json index 880a8d4..d312061 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -13,29 +13,34 @@ "revision": "bd3c8e81be01eef76d4b503f5e687d2d1354d2d9" }, { - "checksumSHA1": "Y9kbUqEbt5vUbqcuJea5PLmqtCI=", + "checksumSHA1": "h1d2lPZf6j2dW/mIqVnd1RdykDo=", "path": "github.com/golang/snappy", - "revision": "b62d312cd2a7474ccb7eb06a4686f0560e4ba7fb" + "revision": "2e65f85255dbc3072edf28d6b5b8efc472979f5a", + "revisionTime": "2018-05-18T05:39:59Z" }, { - "checksumSHA1": "LCo45KOYMPpJKgxlj3mlMVF1zN0=", + "checksumSHA1": "5TtDLqXQnf8LkzUBLcWX96KLtOM=", "path": "github.com/optiopay/kafka", - "revision": "5ea5ed9f82995911f606b346b370ffd233363950" + "revision": "969d39f8acbefd704dea8ee8fede3f4413f5ed9e", + "revisionTime": "2018-08-28T12:26:47Z" }, { - "checksumSHA1": "pI0IkJbYND5BK1oAF5LmByjq3C8=", + "checksumSHA1": "+BXYn7eHWlyGDhnNX4uJZowZbwU=", "path": "github.com/optiopay/kafka/integration", - "revision": "5ea5ed9f82995911f606b346b370ffd233363950" + "revision": "969d39f8acbefd704dea8ee8fede3f4413f5ed9e", + "revisionTime": "2018-08-28T12:26:47Z" }, { - "checksumSHA1": "0RLEpjhmQspBVOF3LjfMQGc9XeQ=", + "checksumSHA1": "6TiW3uQo5CvvsO16NP6ny98dzwI=", "path": "github.com/optiopay/kafka/kafkatest", - "revision": "5ea5ed9f82995911f606b346b370ffd233363950" + "revision": "969d39f8acbefd704dea8ee8fede3f4413f5ed9e", + "revisionTime": "2018-08-28T12:26:47Z" }, { - "checksumSHA1": "FRHMZLdFcsD9j3ZfCXfUN2TFBm0=", + "checksumSHA1": "HoMmqVgV6Fp294HEqeHROSoOeAc=", "path": "github.com/optiopay/kafka/proto", - "revision": "5ea5ed9f82995911f606b346b370ffd233363950" + "revision": "969d39f8acbefd704dea8ee8fede3f4413f5ed9e", + "revisionTime": "2018-08-28T12:26:47Z" }, { "checksumSHA1": "ynJSWoF6v+3zMnh9R0QmmG6iGV8=",