Skip to content

Commit

Permalink
[ISSUE #1160] add brokerName in request protocol (#1161)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuz10 committed Sep 2, 2024
1 parent 8cfa33a commit 98ee94d
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 11 deletions.
14 changes: 9 additions & 5 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.Messa
SubExpression: data.SubString,
// TODO: add subversion
ExpressionType: string(data.ExpType),
BrokerName: queue.BrokerName,
}

if data.ExpType == string(TAG) {
Expand Down Expand Up @@ -999,8 +1000,9 @@ func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, er
}

request := &internal.GetMaxOffsetRequestHeader{
Topic: mq.Topic,
QueueId: mq.QueueId,
Topic: mq.Topic,
QueueId: mq.QueueId,
BrokerName: mq.BrokerName,
}

cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
Expand Down Expand Up @@ -1029,9 +1031,10 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, t
}

request := &internal.SearchOffsetRequestHeader{
Topic: mq.Topic,
QueueId: mq.QueueId,
Timestamp: timestamp,
Topic: mq.Topic,
QueueId: mq.QueueId,
Timestamp: timestamp,
BrokerName: mq.BrokerName,
}

cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
Expand Down Expand Up @@ -1128,6 +1131,7 @@ func buildSendToRetryRequest(mq *primitive.MessageQueue, msg *primitive.Message,
Properties: msg.MarshallProperties(),
ReconsumeTimes: int(reconsumeTimes),
MaxReconsumeTimes: int(maxReconsumeTimes),
BrokerName: mq.BrokerName,
}

return remote.NewRemotingCommand(internal.ReqSendMessage, req, msg.Body)
Expand Down
2 changes: 2 additions & 0 deletions consumer/offset_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq
ConsumerGroup: group,
Topic: mq.Topic,
QueueId: mq.QueueId,
BrokerName: mq.BrokerName,
}
cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
Expand Down Expand Up @@ -429,6 +430,7 @@ func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq p
Topic: mq.Topic,
QueueId: mq.QueueId,
CommitOffset: off,
BrokerName: mq.BrokerName,
}
cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion consumer/process_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) {
rlog.LogKeyQueueOffset: msg.QueueOffset,
})
pq.mutex.RUnlock()
if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) {
if !pc.sendMessageBack(msg.Queue.BrokerName, msg, int(3+msg.ReconsumeTimes)) {
rlog.Error("send message back to broker error when clean expired messages", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
Expand Down
2 changes: 2 additions & 0 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ func (pc *defaultPullConsumer) buildSendBackRequest(msg *primitive.MessageExt, d
DelayLevel: delayLevel,
OriginMsgId: msg.MsgId,
MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
BrokerName: msg.Queue.BrokerName,
}

return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, nil)
Expand Down Expand Up @@ -746,6 +747,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
SubExpression: sd.SubString,
ExpressionType: string(TAG),
SuspendTimeoutMillis: 20 * time.Second,
BrokerName: request.mq.BrokerName,
}

brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
Expand Down
2 changes: 2 additions & 0 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
SubExpression: subExpression,
ExpressionType: string(TAG),
SuspendTimeoutMillis: 20 * time.Second,
BrokerName: request.mq.BrokerName,
}
//
//if data.ExpType == string(TAG) {
Expand Down Expand Up @@ -937,6 +938,7 @@ func (pc *pushConsumer) buildSendBackRequest(msg *primitive.MessageExt, delayLev
DelayLevel: delayLevel,
OriginMsgId: msg.MsgId,
MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
BrokerName: msg.Queue.BrokerName,
}

return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, nil)
Expand Down
27 changes: 22 additions & 5 deletions internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type SendMessageRequestHeader struct {
Batch bool
DefaultTopic string
DefaultTopicQueueNums int
BrokerName string
}

func (request *SendMessageRequestHeader) Encode() map[string]string {
Expand All @@ -89,6 +90,7 @@ func (request *SendMessageRequestHeader) Encode() map[string]string {
maps["defaultTopicQueueNums"] = "4"
maps["batch"] = strconv.FormatBool(request.Batch)
maps["properties"] = request.Properties
maps["bname"] = request.BrokerName

return maps
}
Expand All @@ -101,6 +103,7 @@ type EndTransactionRequestHeader struct {
FromTransactionCheck bool
MsgID string
TransactionId string
BrokerName string
}

type SendMessageRequestV2Header struct {
Expand All @@ -122,6 +125,7 @@ func (request *SendMessageRequestV2Header) Encode() map[string]string {
maps["k"] = strconv.FormatBool(request.UnitMode)
maps["l"] = strconv.Itoa(request.MaxReconsumeTimes)
maps["m"] = strconv.FormatBool(request.Batch)
maps["n"] = request.BrokerName
return maps
}

Expand All @@ -134,6 +138,7 @@ func (request *EndTransactionRequestHeader) Encode() map[string]string {
maps["fromTransactionCheck"] = strconv.FormatBool(request.FromTransactionCheck)
maps["msgId"] = request.MsgID
maps["transactionId"] = request.TransactionId
maps["bname"] = request.BrokerName
return maps
}

Expand Down Expand Up @@ -185,6 +190,7 @@ type ConsumerSendMsgBackRequestHeader struct {
OriginTopic string
UnitMode bool
MaxReconsumeTimes int32
BrokerName string
}

func (request *ConsumerSendMsgBackRequestHeader) Encode() map[string]string {
Expand All @@ -196,6 +202,7 @@ func (request *ConsumerSendMsgBackRequestHeader) Encode() map[string]string {
maps["originTopic"] = request.OriginTopic
maps["unitMode"] = strconv.FormatBool(request.UnitMode)
maps["maxReconsumeTimes"] = strconv.Itoa(int(request.MaxReconsumeTimes))
maps["bname"] = request.BrokerName

return maps
}
Expand All @@ -212,6 +219,7 @@ type PullMessageRequestHeader struct {
SubExpression string
SubVersion int64
ExpressionType string
BrokerName string
}

func (request *PullMessageRequestHeader) Encode() map[string]string {
Expand All @@ -227,6 +235,7 @@ func (request *PullMessageRequestHeader) Encode() map[string]string {
maps["subscription"] = request.SubExpression
maps["subVersion"] = fmt.Sprintf("%d", request.SubVersion)
maps["expressionType"] = request.ExpressionType
maps["bname"] = request.BrokerName

return maps
}
Expand All @@ -242,42 +251,48 @@ func (request *GetConsumerListRequestHeader) Encode() map[string]string {
}

type GetMaxOffsetRequestHeader struct {
Topic string
QueueId int
Topic string
QueueId int
BrokerName string
}

func (request *GetMaxOffsetRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["topic"] = request.Topic
maps["queueId"] = strconv.Itoa(request.QueueId)
maps["bname"] = request.BrokerName
return maps
}

type QueryConsumerOffsetRequestHeader struct {
ConsumerGroup string
Topic string
QueueId int
BrokerName string
}

func (request *QueryConsumerOffsetRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.ConsumerGroup
maps["topic"] = request.Topic
maps["queueId"] = strconv.Itoa(request.QueueId)
maps["bname"] = request.BrokerName
return maps
}

type SearchOffsetRequestHeader struct {
Topic string
QueueId int
Timestamp int64
Topic string
QueueId int
Timestamp int64
BrokerName string
}

func (request *SearchOffsetRequestHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["topic"] = request.Topic
maps["queueId"] = strconv.Itoa(request.QueueId)
maps["timestamp"] = strconv.FormatInt(request.Timestamp, 10)
maps["bname"] = request.BrokerName
return maps
}

Expand All @@ -286,6 +301,7 @@ type UpdateConsumerOffsetRequestHeader struct {
Topic string
QueueId int
CommitOffset int64
BrokerName string
}

func (request *UpdateConsumerOffsetRequestHeader) Encode() map[string]string {
Expand All @@ -294,6 +310,7 @@ func (request *UpdateConsumerOffsetRequestHeader) Encode() map[string]string {
maps["topic"] = request.Topic
maps["queueId"] = strconv.Itoa(request.QueueId)
maps["commitOffset"] = strconv.FormatInt(request.CommitOffset, 10)
maps["bname"] = request.BrokerName
return maps
}

Expand Down
1 change: 1 addition & 0 deletions internal/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ func (td *traceDispatcher) buildSendRequest(mq *primitive.MessageQueue,
BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond),
Flag: msg.Flag,
Properties: msg.MarshallProperties(),
BrokerName: mq.BrokerName,
}

return remote.NewRemotingCommand(ReqSendMessage, req, msg.Body)
Expand Down
2 changes: 2 additions & 0 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
Batch: msg.Batch,
DefaultTopic: p.options.CreateTopicKey,
DefaultTopicQueueNums: p.options.DefaultTopicQueueNums,
BrokerName: mq.BrokerName,
}

msgType := msg.GetProperty(primitive.PropertyMsgType)
Expand Down Expand Up @@ -762,6 +763,7 @@ func (tp *transactionProducer) endTransaction(result primitive.SendResult, err e
TranStateTableOffset: result.QueueOffset,
MsgID: result.MsgID,
CommitOrRollback: tp.transactionState(state),
BrokerName: result.MessageQueue.BrokerName,
}

req := remote.NewRemotingCommand(internal.ReqENDTransaction, requestHeader, nil)
Expand Down

0 comments on commit 98ee94d

Please sign in to comment.