From 54bcb238e38a8406155fa947b4b7d299b675055d Mon Sep 17 00:00:00 2001 From: raihankhan Date: Fri, 12 Jan 2024 17:41:50 +0600 Subject: [PATCH] Update methodnames Signed-off-by: raihankhan --- kafka/client.go | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/kafka/client.go b/kafka/client.go index f45712e17..216f20cdb 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -112,28 +112,31 @@ func (c *Client) GetPartitionLeaderAddress(partition int32, topic string) (strin return leader.Addr(), nil } -func (a *AdminClient) EnsureKafkaTopic(topic string, topicConfig map[string]*string, partitions int32, replicationFactor int16) error { +func (a *AdminClient) IsTopicExists(topic string) (bool, error) { topics, err := a.ListTopics() if err != nil { klog.Error(err, "Failed to list kafka topics") - return err + return false, err } + _, topicExists := topics[topic] + return topicExists, nil +} - if _, topicExists := topics[topic]; !topicExists { - err = a.CreateTopic(topic, &kafkago.TopicDetail{ - NumPartitions: partitions, - ReplicationFactor: replicationFactor, - ConfigEntries: topicConfig, - }, true) - if err != nil { - klog.Error(err, fmt.Sprintf("Failed to create topic - %s", topic)) - return err - } - klog.Info(fmt.Sprintf("Created topic - %s", topic)) +func (a *AdminClient) CreateKafkaTopic(topic string, topicConfig map[string]*string, partitions int32, replicationFactor int16) error { + err := a.CreateTopic(topic, &kafkago.TopicDetail{ + NumPartitions: partitions, + ReplicationFactor: replicationFactor, + ConfigEntries: topicConfig, + }, true) + if err != nil { + klog.Error(err, fmt.Sprintf("Failed to create topic - %s", topic)) + return err } + klog.Info(fmt.Sprintf("Created topic - %s", topic)) return nil } -func (c *Client) DeleteTopics(topics ...string) { + +func (c *Client) DeleteKafkaTopics(topics ...string) { broker, err := c.Controller() if err != nil { klog.Error(err, "Failed to get controller broker") @@ -149,7 +152,7 @@ func (c *Client) DeleteTopics(topics ...string) { } } -func (p *ProducerClient) SendMessageWithProducer(partition int32, topic, key, message string) (*MessageMetadata, error) { +func (p *ProducerClient) PublishMessages(partition int32, topic, key, message string) (*MessageMetadata, error) { producerMsg := &kafkago.ProducerMessage{ Topic: topic, Key: kafkago.StringEncoder(key),