Skip to content

Commit

Permalink
Send retry as a normal message when sendMessageBack fails
Browse files Browse the repository at this point in the history
  • Loading branch information
redlsz committed Nov 13, 2023
1 parent c9e197c commit 6d4ffe7
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 5 deletions.
96 changes: 96 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package consumer
import (
"context"
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -57,6 +58,9 @@ const (

// Offset persistent interval for consumer
_PersistConsumerOffsetInterval = 5 * time.Second

// Timeout for sending message to retry topic
_SendMessageBackAsNormalTimeout = 3 * time.Second
)

type ConsumeType string
Expand All @@ -66,6 +70,8 @@ const (
_PushConsume = ConsumeType("CONSUME_PASSIVELY")

_SubAll = "*"

_ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER"
)

// Message model defines the way how messages are delivered to each consumer clients.
Expand Down Expand Up @@ -1037,6 +1043,96 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, t
return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
}

func (dc *defaultConsumer) sendMessageBackAsNormal(msg *primitive.MessageExt, maxReconsumeTimes int32) bool {
retryTopic := internal.GetRetryTopic(dc.consumerGroup)
normalMsg := &primitive.Message{
Topic: retryTopic,
Body: msg.Body,
Flag: msg.Flag,
}
normalMsg.WithProperties(msg.GetProperties())
originMsgId := msg.GetProperty(primitive.PropertyOriginMessageId)
if len(originMsgId) == 0 {
originMsgId = msg.MsgId
}
normalMsg.WithProperty(primitive.PropertyOriginMessageId, originMsgId)
normalMsg.WithProperty(primitive.PropertyRetryTopic, msg.Topic)
normalMsg.RemoveProperty(primitive.PropertyTransactionPrepared)
normalMsg.WithDelayTimeLevel(int(3 + msg.ReconsumeTimes))

mq, err := dc.findPublishMessageQueue(retryTopic)
if err != nil {
rlog.Warning("sendMessageBackAsNormal find publish message queue error", map[string]interface{}{
rlog.LogKeyTopic: retryTopic,
rlog.LogKeyMessageId: msg.MsgId,
rlog.LogKeyUnderlayError: err.Error(),
})
return false
}

brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if len(brokerAddr) == 0 {
rlog.Warning("sendMessageBackAsNormal cannot find broker address", map[string]interface{}{
rlog.LogKeyMessageId: msg.MsgId,
rlog.LogKeyBroker: mq.BrokerName,
rlog.LogKeyUnderlayError: err.Error(),
})
return false
}

request := buildSendToRetryRequest(mq, normalMsg, msg.ReconsumeTimes+1, maxReconsumeTimes)
resp, err := dc.client.InvokeSync(context.Background(), brokerAddr, request, _SendMessageBackAsNormalTimeout)
if err != nil {
rlog.Warning("sendMessageBackAsNormal failed to invoke", map[string]interface{}{
rlog.LogKeyTopic: retryTopic,
rlog.LogKeyMessageId: msg.MsgId,
rlog.LogKeyBroker: brokerAddr,
rlog.LogKeyUnderlayError: err.Error(),
})
return false
}
if resp.Code != internal.ResSuccess {
rlog.Warning("sendMessageBackAsNormal failed to send", map[string]interface{}{
rlog.LogKeyTopic: retryTopic,
rlog.LogKeyMessageId: msg.MsgId,
rlog.LogKeyBroker: brokerAddr,
rlog.LogKeyUnderlayError: fmt.Errorf("CODE: %d, DESC: %s", resp.Code, resp.Remark),
})
return false
}

return true
}

func (dc *defaultConsumer) findPublishMessageQueue(topic string) (*primitive.MessageQueue, error) {
mqs, err := dc.client.GetNameSrv().FetchPublishMessageQueues(topic)
if err != nil {
return nil, err
}

if len(mqs) <= 0 {
return nil, fmt.Errorf("no writable queues")
}

return mqs[rand.Intn(len(mqs))], nil
}

func buildSendToRetryRequest(mq *primitive.MessageQueue, msg *primitive.Message, reconsumeTimes,
maxReconsumeTimes int32) *remote.RemotingCommand {
req := &internal.SendMessageRequestHeader{
ProducerGroup: _ClientInnerProducerGroup,
Topic: mq.Topic,
QueueId: mq.QueueId,
BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond),
Flag: msg.Flag,
Properties: msg.MarshallProperties(),
ReconsumeTimes: int(reconsumeTimes),
MaxReconsumeTimes: int(maxReconsumeTimes),
}

return remote.NewRemotingCommand(internal.ReqSendMessage, req, msg.Body)
}

func buildSubscriptionData(topic string, selector MessageSelector) *internal.SubscriptionData {
subData := &internal.SubscriptionData{
Topic: topic,
Expand Down
8 changes: 6 additions & 2 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,12 @@ func (pc *defaultPullConsumer) sendMessageBack(brokerName string, msg *primitive
} else {
brokerAddr = msg.StoreHost
}
_, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
return err == nil
resp, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
if err != nil || resp.Code != internal.ResSuccess {
// send back as a normal message
return pc.defaultConsumer.sendMessageBackAsNormal(msg, pc.getMaxReconsumeTimes())
}
return true
}

func (pc *defaultPullConsumer) buildSendBackRequest(msg *primitive.MessageExt, delayLevel int) *remote.RemotingCommand {
Expand Down
7 changes: 4 additions & 3 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,9 +921,10 @@ func (pc *pushConsumer) sendMessageBack(brokerName string, msg *primitive.Messag
} else {
brokerAddr = msg.StoreHost
}
_, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
if err != nil {
return false
resp, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
if err != nil || resp.Code != internal.ResSuccess {
// send back as a normal message
return pc.defaultConsumer.sendMessageBackAsNormal(msg, pc.getMaxReconsumeTimes())
}
return true
}
Expand Down

0 comments on commit 6d4ffe7

Please sign in to comment.