From 3e3bc03299db71a2c4423e6441a72c3d33fe357c Mon Sep 17 00:00:00 2001 From: caojiaqiang Date: Fri, 9 Aug 2024 14:17:53 +0800 Subject: [PATCH] feat: batchSend messages return result has msgIds and offsetMsgId --- internal/client.go | 8 ++++++-- primitive/message.go | 1 + producer/producer.go | 2 ++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/internal/client.go b/internal/client.go index 6f27f5e8..938c5faa 100644 --- a/internal/client.go +++ b/internal/client.go @@ -741,8 +741,12 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC } msgIDs := make([]string, 0) - for i := 0; i < len(msgs); i++ { - msgIDs = append(msgIDs, msgs[i].GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)) + if msgs[0].Batch { + for i := range msgs[0].List { + msgIDs = append(msgIDs, msgs[0].List[i].GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)) + } + } else { + msgIDs = append(msgIDs, msgs[0].GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)) } uniqueMsgId := strings.Join(msgIDs, ",") diff --git a/primitive/message.go b/primitive/message.go index 9542413b..5bf5db0a 100644 --- a/primitive/message.go +++ b/primitive/message.go @@ -69,6 +69,7 @@ const ( ) type Message struct { + List []*Message Topic string Body []byte CompressedBody []byte diff --git a/producer/producer.go b/producer/producer.go index eb3cd2e9..df67798a 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -144,12 +144,14 @@ func (p *defaultProducer) encodeBatch(msgs ...*primitive.Message) *primitive.Mes batch.Queue = msgs[0].Queue batch.Body = MarshalMessageBatch(msgs...) batch.Batch = true + batch.List = msgs return batch } func MarshalMessageBatch(msgs ...*primitive.Message) []byte { buffer := bytes.NewBufferString("") for _, msg := range msgs { + msg.WithProperty(primitive.PropertyUniqueClientMessageIdKeyIndex, primitive.CreateUniqID()) data := msg.Marshal() buffer.Write(data) }