Skip to content

Commit

Permalink
Remove refresh metadata
Browse files Browse the repository at this point in the history
Signed-off-by: obaydullahmhs <[email protected]>
  • Loading branch information
obaydullahmhs committed Dec 29, 2023
1 parent cd0d824 commit f9537b3
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,19 @@ func (c *Client) RefreshTopicMetadata(topics ...string) error {
}

func (c *Client) GetPartitionLeaderAddress(partition int32, topic string) (string, error) {
if err := c.RefreshTopicMetadata(topic); err != nil {
return "", err
}
//if err := c.RefreshTopicMetadata(topic); err != nil {
// return "", err
//}
leader, err := c.Leader(topic, partition)
if err != nil {
klog.Error(err, "Failed to get leader", "partition", partition)
return "", err
}
err = c.RefreshBrokers([]string{leader.Addr()})
if err != nil {
klog.Error(err, fmt.Sprintf("Failed to refresh broker for %s topic", topic))
return "", err
}
//err = c.RefreshBrokers([]string{leader.Addr()})
//if err != nil {
// klog.Error(err, fmt.Sprintf("Failed to refresh broker for %s topic", topic))
// return "", err
//}
return leader.Addr(), nil
}

Expand All @@ -121,9 +121,9 @@ func (a *AdminClient) EnsureKafkaTopic(topic string, topicConfig map[string]*str
return nil
}
func (c *Client) DeleteTopics(topics ...string) {
broker, err := c.RefreshController()
broker, err := c.Controller()
if err != nil {
klog.Error(err, "Failed to refresh controller for kafka-health topic")
klog.Error(err, "Failed to get controller broker")
return
}
_, err = broker.DeleteTopics(&kafkago.DeleteTopicsRequest{
Expand All @@ -135,11 +135,11 @@ func (c *Client) DeleteTopics(topics ...string) {
return
}

err = c.RefreshTopicMetadata(topics...)
if err != nil {
klog.Error("Failed to refresh topic metadata")
return
}
//err = c.RefreshTopicMetadata(topics...)
//if err != nil {
// klog.Error("Failed to refresh topic metadata")
// return
//}
}

func (p *ProducerClient) SendMessageWithProducer(partition int32, topic, key, message string) (*MessageMetadata, error) {
Expand Down

0 comments on commit f9537b3

Please sign in to comment.