Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Mongey committed Oct 16, 2023
1 parent 419f0b7 commit 3605e2c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
4 changes: 3 additions & 1 deletion kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func (c *Config) newKafkaConfig() (*sarama.Config, error) {
kafkaConfig.Admin.Timeout = time.Duration(c.Timeout) * time.Second
kafkaConfig.Metadata.Full = true // the default, but just being clear
kafkaConfig.Metadata.AllowAutoTopicCreation = false
kafkaConfig.Metadata.Retry.Max = 10
kafkaConfig.Metadata.Retry.Backoff = 250 * time.Millisecond

kafkaConfig.Net.Proxy.Enable = true
kafkaConfig.Net.Proxy.Dialer = proxy.FromEnvironment()
Expand Down Expand Up @@ -93,7 +95,7 @@ func parsePemOrLoadFromFile(input string) (*pem.Block, []byte, error) {

if inputBlock == nil {
//attempt to load from file
log.Printf("[INFO] Attempting to load from file '%s'", input)
log.Printf("[TRACE] Attempting to load from file '%s'", input)
var err error
inputBytes, err = os.ReadFile(input)
if err != nil {
Expand Down
30 changes: 24 additions & 6 deletions kafka/lazy_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ func (c *LazyClient) init() error {
log.Printf("[TRACE] lazy client init %s", c.initErr)
}
if c.initErr == sarama.ErrBrokerNotAvailable || c.initErr == sarama.ErrOutOfBrokers {
log.Printf("[ERROR] Cannot connect to Kafka broker(s) %v", *(c.Config.BootstrapServers))
log.Printf("[ERROR] Check if Kafka broker(s) are up and running")
if c.Config.TLSEnabled {
log.Printf("[ERROR] Check if Kafka broker(s) are reachable from this machine using TLS")
tlsError := c.checkTLSConfig()
if tlsError != nil {
return fmt.Errorf("%w\n%s", tlsError, c.initErr)
Expand All @@ -48,14 +51,29 @@ func (c *LazyClient) checkTLSConfig() error {
}

brokers := *(c.Config.BootstrapServers)
broker := brokers[0]
tlsConf := kafkaConfig.Net.TLS.Config
conn, err := tls.Dial("tcp", broker, tlsConf)
if err != nil {
return err
errs := make([]error, 0, len(brokers))

for i := 0; i < len(brokers); i++ {
broker := brokers[i]
tlsConf := kafkaConfig.Net.TLS.Config
conn, err := tls.Dial("tcp", broker, tlsConf)
if err != nil {
errs = append(errs, err)
continue
}

err = conn.Handshake()
if err != nil {
errs = append(errs, err)
continue
}
}

if len(errs) > 0 {
return fmt.Errorf("TLS handshake failed for all brokers: %v", errs)
}

return conn.Handshake()
return nil
}

func (c *LazyClient) CreateTopic(t Topic) error {
Expand Down

0 comments on commit 3605e2c

Please sign in to comment.