From 988fe6f485c86cefd4e418497d0a41b067bd4688 Mon Sep 17 00:00:00 2001 From: Yan Bo <32193417+boboyan@users.noreply.github.com> Date: Fri, 3 Nov 2017 03:31:23 -0500 Subject: [PATCH 1/2] Update consumer.go fix: error use for print --- kafka-go-demo/src/services/consumer/consumer.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/kafka-go-demo/src/services/consumer/consumer.go b/kafka-go-demo/src/services/consumer/consumer.go index 4408d8f..a8ad474 100644 --- a/kafka-go-demo/src/services/consumer/consumer.go +++ b/kafka-go-demo/src/services/consumer/consumer.go @@ -53,16 +53,12 @@ func init() { clusterCfg.Version = sarama.V0_10_0_0 if err = clusterCfg.Validate(); err != nil { - msg := fmt.Sprintf("Kafka consumer config invalidate. config: %v. err: %v", *clusterCfg, err) - fmt.Println(msg) - panic(msg) + log.Panicf("Kafka consumer config invalidate. config: %v. err: %v", *clusterCfg, err) } consumer, err = cluster.NewConsumer(cfg.Servers, cfg.ConsumerId, cfg.Topics, clusterCfg) if err != nil { - msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg) - fmt.Println(msg) - panic(msg) + log.Panicf("Create kafka consumer error: %v. config: %v", err, clusterCfg) } sig = make(chan os.Signal, 1) @@ -79,16 +75,16 @@ func consume() { case msg, more := <-consumer.Messages(): if more { - fmt.Println("kafka consumer msg: %v", *msg) + fmt.Println("kafka consumer msg: %v\n", *msg) consumer.MarkOffset(msg, "") // mark message as processed } case err, more := <-consumer.Errors(): if more { - fmt.Println("Kafka consumer error: %v", err.Error()) + fmt.Println("Kafka consumer error: %v\n", err.Error()) } case ntf, more := <-consumer.Notifications(): if more { - fmt.Println("Kafka consumer rebalance: %v", ntf) + fmt.Println("Kafka consumer rebalance: %v\n", ntf) } case <-sig: fmt.Errorf("Stop consumer server...") From 3bb66212d897f3f2c9d273a215a84c82a92aa8f0 Mon Sep 17 00:00:00 2001 From: Yan Bo <32193417+boboyan@users.noreply.github.com> Date: Fri, 3 Nov 2017 03:32:31 -0500 Subject: [PATCH 2/2] Update consumer.go error use for print --- kafka-go-demo/src/services/consumer/consumer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka-go-demo/src/services/consumer/consumer.go b/kafka-go-demo/src/services/consumer/consumer.go index a8ad474..be1845b 100644 --- a/kafka-go-demo/src/services/consumer/consumer.go +++ b/kafka-go-demo/src/services/consumer/consumer.go @@ -75,16 +75,16 @@ func consume() { case msg, more := <-consumer.Messages(): if more { - fmt.Println("kafka consumer msg: %v\n", *msg) + fmt.Printf("kafka consumer msg: %v\n", *msg) consumer.MarkOffset(msg, "") // mark message as processed } case err, more := <-consumer.Errors(): if more { - fmt.Println("Kafka consumer error: %v\n", err.Error()) + fmt.Printf("Kafka consumer error: %v\n", err.Error()) } case ntf, more := <-consumer.Notifications(): if more { - fmt.Println("Kafka consumer rebalance: %v\n", ntf) + fmt.Printf("Kafka consumer rebalance: %v\n", ntf) } case <-sig: fmt.Errorf("Stop consumer server...")