Skip to content

Commit

Permalink
Update methodnames
Browse files Browse the repository at this point in the history
Signed-off-by: raihankhan <[email protected]>
  • Loading branch information
raihankhan committed Jan 12, 2024
1 parent 4e7165b commit 54bcb23
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,28 +112,31 @@ func (c *Client) GetPartitionLeaderAddress(partition int32, topic string) (strin
return leader.Addr(), nil
}

func (a *AdminClient) EnsureKafkaTopic(topic string, topicConfig map[string]*string, partitions int32, replicationFactor int16) error {
func (a *AdminClient) IsTopicExists(topic string) (bool, error) {
topics, err := a.ListTopics()
if err != nil {
klog.Error(err, "Failed to list kafka topics")
return err
return false, err
}
_, topicExists := topics[topic]
return topicExists, nil
}

if _, topicExists := topics[topic]; !topicExists {
err = a.CreateTopic(topic, &kafkago.TopicDetail{
NumPartitions: partitions,
ReplicationFactor: replicationFactor,
ConfigEntries: topicConfig,
}, true)
if err != nil {
klog.Error(err, fmt.Sprintf("Failed to create topic - %s", topic))
return err
}
klog.Info(fmt.Sprintf("Created topic - %s", topic))
func (a *AdminClient) CreateKafkaTopic(topic string, topicConfig map[string]*string, partitions int32, replicationFactor int16) error {
err := a.CreateTopic(topic, &kafkago.TopicDetail{
NumPartitions: partitions,
ReplicationFactor: replicationFactor,
ConfigEntries: topicConfig,
}, true)
if err != nil {
klog.Error(err, fmt.Sprintf("Failed to create topic - %s", topic))
return err
}
klog.Info(fmt.Sprintf("Created topic - %s", topic))
return nil
}
func (c *Client) DeleteTopics(topics ...string) {

func (c *Client) DeleteKafkaTopics(topics ...string) {
broker, err := c.Controller()
if err != nil {
klog.Error(err, "Failed to get controller broker")
Expand All @@ -149,7 +152,7 @@ func (c *Client) DeleteTopics(topics ...string) {
}
}

func (p *ProducerClient) SendMessageWithProducer(partition int32, topic, key, message string) (*MessageMetadata, error) {
func (p *ProducerClient) PublishMessages(partition int32, topic, key, message string) (*MessageMetadata, error) {
producerMsg := &kafkago.ProducerMessage{
Topic: topic,
Key: kafkago.StringEncoder(key),
Expand Down

0 comments on commit 54bcb23

Please sign in to comment.