From 09c1624f20e308246a129169c4d7fef4529b4bf4 Mon Sep 17 00:00:00 2001 From: dinglei Date: Tue, 2 Jun 2020 16:26:55 +0800 Subject: [PATCH] Support log level set and add more log in offset. (#489) * Add log print for offset manager by broker * modify log level for offset * Support log level set --- consumer/offset_store.go | 19 ++++++++++++++++--- rlog/log.go | 19 +++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/consumer/offset_store.go b/consumer/offset_store.go index 44a05971..17f5d76a 100644 --- a/consumer/offset_store.go +++ b/consumer/offset_store.go @@ -255,6 +255,12 @@ func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) { rlog.LogKeyUnderlayError: err.Error(), "offset": off, }) + } else { + rlog.Info("update offset to broker success", map[string]interface{}{ + rlog.LogKeyConsumerGroup: r.group, + rlog.LogKeyMessageQueue: mq.String(), + "offset": off, + }) } } } @@ -264,8 +270,9 @@ func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) { defer r.mutex.Unlock() delete(r.OffsetTable, *mq) - rlog.Info("delete mq from offset table", map[string]interface{}{ - rlog.LogKeyMessageQueue: mq, + rlog.Warning("delete mq from offset table", map[string]interface{}{ + rlog.LogKeyConsumerGroup: r.group, + rlog.LogKeyMessageQueue: mq, }) } @@ -286,13 +293,19 @@ func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) i case _ReadFromStore: off, err := r.fetchConsumeOffsetFromBroker(r.group, mq) if err != nil { - rlog.Error("fecth offset of mq error", map[string]interface{}{ + rlog.Error("fecth offset of mq from broker error", map[string]interface{}{ + rlog.LogKeyConsumerGroup: r.group, rlog.LogKeyMessageQueue: mq.String(), rlog.LogKeyUnderlayError: err, }) r.mutex.RUnlock() return -1 } + rlog.Warning("fecth offset of mq from broker success", map[string]interface{}{ + rlog.LogKeyConsumerGroup: r.group, + rlog.LogKeyMessageQueue: mq.String(), + "offset": off, + }) r.mutex.RUnlock() r.update(mq, off, true) return off diff --git a/rlog/log.go b/rlog/log.go index 426f6981..444ec3fb 100644 --- a/rlog/log.go +++ b/rlog/log.go @@ -41,6 +41,7 @@ type Logger interface { Warning(msg string, fields map[string]interface{}) Error(msg string, fields map[string]interface{}) Fatal(msg string, fields map[string]interface{}) + Level(level string) } func init() { @@ -101,11 +102,29 @@ func (l *defaultLogger) Fatal(msg string, fields map[string]interface{}) { } l.logger.WithFields(fields).Fatal(msg) } +func (l *defaultLogger) Level(level string) { + switch strings.ToLower(level) { + case "debug": + l.logger.SetLevel(logrus.DebugLevel) + case "warn": + l.logger.SetLevel(logrus.WarnLevel) + case "error": + l.logger.SetLevel(logrus.ErrorLevel) + default: + l.logger.SetLevel(logrus.InfoLevel) + } +} // SetLogger use specified logger user customized, in general, we suggest user to replace the default logger with specified func SetLogger(logger Logger) { rLog = logger } +func SetLogLevel(level string) { + if level == "" { + return + } + rLog.Level(level) +} func Debug(msg string, fields map[string]interface{}) { rLog.Debug(msg, fields)