diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index d2aee5b7..25b8ce87 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -913,7 +913,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive. Properties: make(map[string]string), ConsumerGroup: pc.consumerGroup, MQ: mq, - Msgs: msgs, + Msgs: subMsgs, } ctx := context.Background() ctx = primitive.WithConsumerCtx(ctx, msgCtx) @@ -944,14 +944,14 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive. } else { increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs)) if pc.model == BroadCasting { - for i := 0; i < len(msgs); i++ { + for i := 0; i < len(subMsgs); i++ { rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{ "message": subMsgs[i], }) } } else { - for i := 0; i < len(msgs); i++ { - msg := msgs[i] + for i := 0; i < len(subMsgs); i++ { + msg := subMsgs[i] if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) { msg.ReconsumeTimes += 1 msgBackFailed = append(msgBackFailed, msg) @@ -973,7 +973,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive. } else { rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{ rlog.LogKeyMessageQueue: mq, - "message": msgs, + "message": subMsgs, }) } })