Skip to content

Commit

Permalink
Add partition key to produce command (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
mpicque authored Apr 15, 2022
1 parent ea45164 commit 39304f9
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,27 @@ func produceCmd(c *cli.Cmd) {
var (
headers = c.StringsOpt("H header", nil, "message header <key=value>")
message = c.StringOpt("m message", "", "message message")
key = c.StringOpt("k key", "", "key key")

topics = c.Strings(cli.StringsArg{
Name: "TOPIC",
Desc: "topic(s) to consume from",
})
)

c.Spec = "[-H...] [-m=<message|->] TOPIC..."
c.Spec = "[-H...] [-m=<message|->] [-k=<key>] TOPIC..."

c.Action = func() {
cfg := config(*useSSL, *sslCAFile, *sslCertFile, *sslKeyFile)

if *message == "" || *message == "-" {
*message = readStdin()
}
produce(*cfg, splitFlatten(*bootstrapServers), splitFlatten(*topics), *headers, *message)
produce(*cfg, splitFlatten(*bootstrapServers), splitFlatten(*topics), *headers, *message, *key)
}
}

func produce(config cluster.Config, bootstrapServers []string, topics []string, headers []string, message string) {
func produce(config cluster.Config, bootstrapServers []string, topics []string, headers []string, message string, key string) {
producer, err := sarama.NewSyncProducer(bootstrapServers, &config.Config)
die(err)

Expand Down Expand Up @@ -68,6 +69,7 @@ func produce(config cluster.Config, bootstrapServers []string, topics []string,
kafkaMessage := sarama.ProducerMessage{
Topic: topic,
Headers: kafkaHeaders,
Key: sarama.StringEncoder(key),
Value: sarama.StringEncoder(message),
}

Expand All @@ -79,10 +81,11 @@ func produce(config cluster.Config, bootstrapServers []string, topics []string,
}
}
fmt.Printf("(Payload):\n---\n%s\n---\n", message)
_, _, err = producer.SendMessage(&kafkaMessage)
var partition, offset, err = producer.SendMessage(&kafkaMessage)
if err != nil {
fmt.Printf("error: %+v\n", err)
}
fmt.Printf("Payload sent to partition %+v - resulting offset %+v\n", partition, offset)
}
}

Expand Down

0 comments on commit 39304f9

Please sign in to comment.