diff --git a/consumer/consumer.go b/consumer/consumer.go index bbb54155..98eb17b1 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -889,6 +889,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.Messa SubExpression: data.SubString, // TODO: add subversion ExpressionType: string(data.ExpType), + BrokerName: queue.BrokerName, } if data.ExpType == string(TAG) { @@ -999,8 +1000,9 @@ func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, er } request := &internal.GetMaxOffsetRequestHeader{ - Topic: mq.Topic, - QueueId: mq.QueueId, + Topic: mq.Topic, + QueueId: mq.QueueId, + BrokerName: mq.BrokerName, } cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil) @@ -1029,9 +1031,10 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, t } request := &internal.SearchOffsetRequestHeader{ - Topic: mq.Topic, - QueueId: mq.QueueId, - Timestamp: timestamp, + Topic: mq.Topic, + QueueId: mq.QueueId, + Timestamp: timestamp, + BrokerName: mq.BrokerName, } cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil) @@ -1128,6 +1131,7 @@ func buildSendToRetryRequest(mq *primitive.MessageQueue, msg *primitive.Message, Properties: msg.MarshallProperties(), ReconsumeTimes: int(reconsumeTimes), MaxReconsumeTimes: int(maxReconsumeTimes), + BrokerName: mq.BrokerName, } return remote.NewRemotingCommand(internal.ReqSendMessage, req, msg.Body) diff --git a/consumer/offset_store.go b/consumer/offset_store.go index b0f9da97..543f397f 100644 --- a/consumer/offset_store.go +++ b/consumer/offset_store.go @@ -390,6 +390,7 @@ func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq ConsumerGroup: group, Topic: mq.Topic, QueueId: mq.QueueId, + BrokerName: mq.BrokerName, } cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil) res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second) @@ -429,6 +430,7 @@ func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq p Topic: mq.Topic, QueueId: mq.QueueId, CommitOffset: off, + BrokerName: mq.BrokerName, } cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil) return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second) diff --git a/consumer/process_queue.go b/consumer/process_queue.go index 120595fc..db67c9f7 100644 --- a/consumer/process_queue.go +++ b/consumer/process_queue.go @@ -275,7 +275,7 @@ func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) { rlog.LogKeyQueueOffset: msg.QueueOffset, }) pq.mutex.RUnlock() - if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) { + if !pc.sendMessageBack(msg.Queue.BrokerName, msg, int(3+msg.ReconsumeTimes)) { rlog.Error("send message back to broker error when clean expired messages", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index b26c8297..a64c1638 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -644,6 +644,7 @@ func (pc *defaultPullConsumer) buildSendBackRequest(msg *primitive.MessageExt, d DelayLevel: delayLevel, OriginMsgId: msg.MsgId, MaxReconsumeTimes: pc.getMaxReconsumeTimes(), + BrokerName: msg.Queue.BrokerName, } return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, nil) @@ -746,6 +747,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) { SubExpression: sd.SubString, ExpressionType: string(TAG), SuspendTimeoutMillis: 20 * time.Second, + BrokerName: request.mq.BrokerName, } brokerResult := pc.defaultConsumer.tryFindBroker(request.mq) diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 0dfeee9c..1a3d2336 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -819,6 +819,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { SubExpression: subExpression, ExpressionType: string(TAG), SuspendTimeoutMillis: 20 * time.Second, + BrokerName: request.mq.BrokerName, } // //if data.ExpType == string(TAG) { @@ -937,6 +938,7 @@ func (pc *pushConsumer) buildSendBackRequest(msg *primitive.MessageExt, delayLev DelayLevel: delayLevel, OriginMsgId: msg.MsgId, MaxReconsumeTimes: pc.getMaxReconsumeTimes(), + BrokerName: msg.Queue.BrokerName, } return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, nil) diff --git a/internal/request.go b/internal/request.go index 7e86b508..93d7a5de 100644 --- a/internal/request.go +++ b/internal/request.go @@ -72,6 +72,7 @@ type SendMessageRequestHeader struct { Batch bool DefaultTopic string DefaultTopicQueueNums int + BrokerName string } func (request *SendMessageRequestHeader) Encode() map[string]string { @@ -89,6 +90,7 @@ func (request *SendMessageRequestHeader) Encode() map[string]string { maps["defaultTopicQueueNums"] = "4" maps["batch"] = strconv.FormatBool(request.Batch) maps["properties"] = request.Properties + maps["bname"] = request.BrokerName return maps } @@ -101,6 +103,7 @@ type EndTransactionRequestHeader struct { FromTransactionCheck bool MsgID string TransactionId string + BrokerName string } type SendMessageRequestV2Header struct { @@ -122,6 +125,7 @@ func (request *SendMessageRequestV2Header) Encode() map[string]string { maps["k"] = strconv.FormatBool(request.UnitMode) maps["l"] = strconv.Itoa(request.MaxReconsumeTimes) maps["m"] = strconv.FormatBool(request.Batch) + maps["n"] = request.BrokerName return maps } @@ -134,6 +138,7 @@ func (request *EndTransactionRequestHeader) Encode() map[string]string { maps["fromTransactionCheck"] = strconv.FormatBool(request.FromTransactionCheck) maps["msgId"] = request.MsgID maps["transactionId"] = request.TransactionId + maps["bname"] = request.BrokerName return maps } @@ -185,6 +190,7 @@ type ConsumerSendMsgBackRequestHeader struct { OriginTopic string UnitMode bool MaxReconsumeTimes int32 + BrokerName string } func (request *ConsumerSendMsgBackRequestHeader) Encode() map[string]string { @@ -196,6 +202,7 @@ func (request *ConsumerSendMsgBackRequestHeader) Encode() map[string]string { maps["originTopic"] = request.OriginTopic maps["unitMode"] = strconv.FormatBool(request.UnitMode) maps["maxReconsumeTimes"] = strconv.Itoa(int(request.MaxReconsumeTimes)) + maps["bname"] = request.BrokerName return maps } @@ -212,6 +219,7 @@ type PullMessageRequestHeader struct { SubExpression string SubVersion int64 ExpressionType string + BrokerName string } func (request *PullMessageRequestHeader) Encode() map[string]string { @@ -227,6 +235,7 @@ func (request *PullMessageRequestHeader) Encode() map[string]string { maps["subscription"] = request.SubExpression maps["subVersion"] = fmt.Sprintf("%d", request.SubVersion) maps["expressionType"] = request.ExpressionType + maps["bname"] = request.BrokerName return maps } @@ -242,14 +251,16 @@ func (request *GetConsumerListRequestHeader) Encode() map[string]string { } type GetMaxOffsetRequestHeader struct { - Topic string - QueueId int + Topic string + QueueId int + BrokerName string } func (request *GetMaxOffsetRequestHeader) Encode() map[string]string { maps := make(map[string]string) maps["topic"] = request.Topic maps["queueId"] = strconv.Itoa(request.QueueId) + maps["bname"] = request.BrokerName return maps } @@ -257,6 +268,7 @@ type QueryConsumerOffsetRequestHeader struct { ConsumerGroup string Topic string QueueId int + BrokerName string } func (request *QueryConsumerOffsetRequestHeader) Encode() map[string]string { @@ -264,13 +276,15 @@ func (request *QueryConsumerOffsetRequestHeader) Encode() map[string]string { maps["consumerGroup"] = request.ConsumerGroup maps["topic"] = request.Topic maps["queueId"] = strconv.Itoa(request.QueueId) + maps["bname"] = request.BrokerName return maps } type SearchOffsetRequestHeader struct { - Topic string - QueueId int - Timestamp int64 + Topic string + QueueId int + Timestamp int64 + BrokerName string } func (request *SearchOffsetRequestHeader) Encode() map[string]string { @@ -278,6 +292,7 @@ func (request *SearchOffsetRequestHeader) Encode() map[string]string { maps["topic"] = request.Topic maps["queueId"] = strconv.Itoa(request.QueueId) maps["timestamp"] = strconv.FormatInt(request.Timestamp, 10) + maps["bname"] = request.BrokerName return maps } @@ -286,6 +301,7 @@ type UpdateConsumerOffsetRequestHeader struct { Topic string QueueId int CommitOffset int64 + BrokerName string } func (request *UpdateConsumerOffsetRequestHeader) Encode() map[string]string { @@ -294,6 +310,7 @@ func (request *UpdateConsumerOffsetRequestHeader) Encode() map[string]string { maps["topic"] = request.Topic maps["queueId"] = strconv.Itoa(request.QueueId) maps["commitOffset"] = strconv.FormatInt(request.CommitOffset, 10) + maps["bname"] = request.BrokerName return maps } diff --git a/internal/trace.go b/internal/trace.go index f7cea8de..344acd65 100644 --- a/internal/trace.go +++ b/internal/trace.go @@ -527,6 +527,7 @@ func (td *traceDispatcher) buildSendRequest(mq *primitive.MessageQueue, BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond), Flag: msg.Flag, Properties: msg.MarshallProperties(), + BrokerName: mq.BrokerName, } return remote.NewRemotingCommand(ReqSendMessage, req, msg.Body) diff --git a/producer/producer.go b/producer/producer.go index eb3cd2e9..00eb351d 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -542,6 +542,7 @@ func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue, Batch: msg.Batch, DefaultTopic: p.options.CreateTopicKey, DefaultTopicQueueNums: p.options.DefaultTopicQueueNums, + BrokerName: mq.BrokerName, } msgType := msg.GetProperty(primitive.PropertyMsgType) @@ -762,6 +763,7 @@ func (tp *transactionProducer) endTransaction(result primitive.SendResult, err e TranStateTableOffset: result.QueueOffset, MsgID: result.MsgID, CommitOrRollback: tp.transactionState(state), + BrokerName: result.MessageQueue.BrokerName, } req := remote.NewRemotingCommand(internal.ReqENDTransaction, requestHeader, nil)