diff --git a/kafka/config.go b/kafka/config.go index e0028f93..7e9502cb 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -34,6 +34,8 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) { kafkaConfig.Admin.Timeout = time.Duration(c.Timeout) * time.Second kafkaConfig.Metadata.Full = true // the default, but just being clear kafkaConfig.Metadata.AllowAutoTopicCreation = false + kafkaConfig.Metadata.Retry.Max = 10 + kafkaConfig.Metadata.Retry.Backoff = 250 * time.Millisecond kafkaConfig.Net.Proxy.Enable = true kafkaConfig.Net.Proxy.Dialer = proxy.FromEnvironment() @@ -93,7 +95,7 @@ func parsePemOrLoadFromFile(input string) (*pem.Block, []byte, error) { if inputBlock == nil { //attempt to load from file - log.Printf("[INFO] Attempting to load from file '%s'", input) + log.Printf("[TRACE] Attempting to load from file '%s'", input) var err error inputBytes, err = os.ReadFile(input) if err != nil { diff --git a/kafka/lazy_client.go b/kafka/lazy_client.go index 7840c1a5..7c661652 100644 --- a/kafka/lazy_client.go +++ b/kafka/lazy_client.go @@ -30,7 +30,10 @@ func (c *LazyClient) init() error { log.Printf("[TRACE] lazy client init %s", c.initErr) } if c.initErr == sarama.ErrBrokerNotAvailable || c.initErr == sarama.ErrOutOfBrokers { + log.Printf("[ERROR] Cannot connect to Kafka broker(s) %v", *(c.Config.BootstrapServers)) + log.Printf("[ERROR] Check if Kafka broker(s) are up and running") if c.Config.TLSEnabled { + log.Printf("[ERROR] Check if Kafka broker(s) are reachable from this machine using TLS") tlsError := c.checkTLSConfig() if tlsError != nil { return fmt.Errorf("%w\n%s", tlsError, c.initErr) @@ -48,14 +51,29 @@ func (c *LazyClient) checkTLSConfig() error { } brokers := *(c.Config.BootstrapServers) - broker := brokers[0] - tlsConf := kafkaConfig.Net.TLS.Config - conn, err := tls.Dial("tcp", broker, tlsConf) - if err != nil { - return err + errs := make([]error, 0, len(brokers)) + + for i := 0; i < len(brokers); i++ { + broker := brokers[i] + tlsConf := kafkaConfig.Net.TLS.Config + conn, err := tls.Dial("tcp", broker, tlsConf) + if err != nil { + errs = append(errs, err) + continue + } + + err = conn.Handshake() + if err != nil { + errs = append(errs, err) + continue + } + } + + if len(errs) > 0 { + return fmt.Errorf("TLS handshake failed for all brokers: %v", errs) } - return conn.Handshake() + return nil } func (c *LazyClient) CreateTopic(t Topic) error {