From 16018dd014b8339a74bbf377fd74015318993597 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 10 Dec 2024 11:47:39 +0100 Subject: [PATCH] code refactor add unConfirmed struct to balance the unconfirm messages. refactor code Signed-off-by: Gabriele Santomaggio --- pkg/stream/coordinator.go | 11 +- pkg/stream/producer.go | 146 ++++++++++-------------- pkg/stream/producer_unconfirmed.go | 108 ++++++++++++++++++ pkg/stream/producer_unconfirmed_test.go | 84 ++++++++++++++ pkg/stream/server_frame.go | 2 +- 5 files changed, 257 insertions(+), 94 deletions(-) create mode 100644 pkg/stream/producer_unconfirmed.go create mode 100644 pkg/stream/producer_unconfirmed_test.go diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index c00ca241..0376aa7b 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -64,12 +64,11 @@ func (coordinator *Coordinator) NewProducer( return nil, err } var producer = &Producer{id: lastId, - options: parameters, - mutex: &sync.RWMutex{}, - mutexUnconfirmed: &sync.Mutex{}, - unConfirmedMessages: map[int64]*ConfirmationStatus{}, - status: open, - dynamicSendCh: make(chan *messageSequence, dynSize), + options: parameters, + mutex: &sync.RWMutex{}, + unConfirmed: newUnConfirmed(5), + status: open, + dynamicSendCh: make(chan *messageSequence, dynSize), } coordinator.producers[lastId] = producer return producer, err diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 8b24f707..bbf66018 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -59,16 +59,15 @@ type messageSequence struct { } type Producer struct { - id uint8 - options *ProducerOptions - onClose onInternalClose - mutexUnconfirmed *sync.Mutex - unConfirmedMessages map[int64]*ConfirmationStatus - sequence int64 - mutex *sync.RWMutex - publishConfirm chan []*ConfirmationStatus - closeHandler chan Event - status int + id uint8 + options *ProducerOptions + onClose onInternalClose + unConfirmed *unConfirmed + sequence int64 + mutex *sync.RWMutex + publishConfirm chan []*ConfirmationStatus + closeHandler chan Event + status int dynamicSendCh chan *messageSequence } @@ -165,37 +164,30 @@ func NewProducerOptions() *ProducerOptions { } func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus { - producer.mutexUnconfirmed.Lock() - defer producer.mutexUnconfirmed.Unlock() - return producer.unConfirmedMessages + return producer.unConfirmed.getAll() } func (producer *Producer) addUnConfirmedSequences(message []*messageSequence, producerID uint8) { - producer.mutexUnconfirmed.Lock() - defer producer.mutexUnconfirmed.Unlock() - for _, msg := range message { - producer.unConfirmedMessages[msg.publishingId] = + producer.unConfirmed.add(msg.publishingId, &ConfirmationStatus{ inserted: time.Now(), message: *msg.refMessage, producerID: producerID, publishingId: msg.publishingId, confirmed: false, - } + }) } } func (producer *Producer) addUnConfirmed(sequence int64, message message.StreamMessage, producerID uint8) { - producer.mutexUnconfirmed.Lock() - defer producer.mutexUnconfirmed.Unlock() - producer.unConfirmedMessages[sequence] = &ConfirmationStatus{ + producer.unConfirmed.add(sequence, &ConfirmationStatus{ inserted: time.Now(), message: message, producerID: producerID, publishingId: sequence, confirmed: false, - } + }) } func (po *ProducerOptions) isSubEntriesBatching() bool { @@ -203,33 +195,25 @@ func (po *ProducerOptions) isSubEntriesBatching() bool { } func (producer *Producer) removeFromConfirmationStatus(status []*ConfirmationStatus) { - producer.mutexUnconfirmed.Lock() - defer producer.mutexUnconfirmed.Unlock() for _, msg := range status { - delete(producer.unConfirmedMessages, msg.publishingId) + producer.unConfirmed.remove(msg.publishingId) for _, linked := range msg.linkedTo { - delete(producer.unConfirmedMessages, linked.publishingId) + producer.unConfirmed.remove(linked.publishingId) } } } func (producer *Producer) removeUnConfirmed(sequence int64) { - producer.mutexUnconfirmed.Lock() - defer producer.mutexUnconfirmed.Unlock() - delete(producer.unConfirmedMessages, sequence) + producer.unConfirmed.remove(sequence) } func (producer *Producer) lenUnConfirmed() int { - producer.mutexUnconfirmed.Lock() - defer producer.mutexUnconfirmed.Unlock() - return len(producer.unConfirmedMessages) + return producer.unConfirmed.size() } func (producer *Producer) getUnConfirmed(sequence int64) *ConfirmationStatus { - producer.mutexUnconfirmed.Lock() - defer producer.mutexUnconfirmed.Unlock() - return producer.unConfirmedMessages[sequence] + return producer.unConfirmed.get(sequence) } func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm { @@ -270,8 +254,7 @@ func (producer *Producer) startUnconfirmedMessagesTimeOutTask() { time.Sleep(2 * time.Second) toRemove := make([]*ConfirmationStatus, 0) // check the unconfirmed messages and remove the one that are expired - producer.mutexUnconfirmed.Lock() - for _, msg := range producer.unConfirmedMessages { + for _, msg := range producer.unConfirmed.getAll() { if time.Since(msg.inserted) > producer.options.ConfirmationTimeOut { msg.err = ConfirmationTimoutError msg.errorCode = timeoutError @@ -279,7 +262,6 @@ func (producer *Producer) startUnconfirmedMessagesTimeOutTask() { toRemove = append(toRemove, msg) } } - producer.mutexUnconfirmed.Unlock() if len(toRemove) > 0 { producer.removeFromConfirmationStatus(toRemove) @@ -331,10 +313,6 @@ func (producer *Producer) processSendingMessages() { logs.LogError("Producer %d, error during batch send: %s", producer.GetID(), err) } sent += len(queueMessages) - //frames += result.TotalFrames - //if result.TotalSent != len(queueMessages) { - // logs.LogError("Producer %d, error during batch send: %s", producer.GetID(), err) - //} queueMessages = queueMessages[:0] iterations++ if iterations > 0 && iterations%1000000 == 0 { @@ -380,13 +358,19 @@ func (producer *Producer) processSendingMessages() { // The two goroutines are ready } -// Send sends a message to the stream and returns an error if the message could not be sent. -// Send is asynchronous. The aggregation of the messages is based on the BatchSize and BatchPublishingDelay -// options. The message is sent when the aggregation is reached or the BatchPublishingDelay is reached. -func (producer *Producer) Send(streamMessage message.StreamMessage) error { +func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 { + sequence := message.GetPublishingId() + // in case of sub entry the deduplication is disabled + if !message.HasPublishingId() || producer.options.isSubEntriesBatching() { + sequence = atomic.AddInt64(&producer.sequence, 1) + } + return sequence +} + +func (producer *Producer) fromMessageToMessageSequence(streamMessage message.StreamMessage) (*messageSequence, error) { marshalBinary, err := streamMessage.MarshalBinary() if err != nil { - return err + return nil, err } seq := producer.assignPublishingID(streamMessage) filterValue := "" @@ -394,14 +378,32 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { filterValue = producer.options.Filter.FilterValue(streamMessage) } - if len(marshalBinary) > producer.options.client.getTuneState().requestedMaxFrameSize { + return &messageSequence{ + messageBytes: marshalBinary, + unCompressedSize: len(marshalBinary), + publishingId: seq, + filterValue: filterValue, + refMessage: &streamMessage, + }, nil + +} + +// Send sends a message to the stream and returns an error if the message could not be sent. +// Send is asynchronous. The aggregation of the messages is based on the BatchSize and BatchPublishingDelay +// options. The message is sent when the aggregation is reached or the BatchPublishingDelay is reached. +func (producer *Producer) Send(streamMessage message.StreamMessage) error { + messageSeq, err := producer.fromMessageToMessageSequence(streamMessage) + if err != nil { + return err + } + if len(messageSeq.messageBytes) > producer.options.client.getTuneState().requestedMaxFrameSize { if producer.publishConfirm != nil { producer.publishConfirm <- []*ConfirmationStatus{ { inserted: time.Now(), message: streamMessage, producerID: producer.GetID(), - publishingId: seq, + publishingId: messageSeq.publishingId, confirmed: false, err: FrameTooLarge, errorCode: responseCodeFrameTooLarge, @@ -410,60 +412,31 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { } return FrameTooLarge } - producer.addUnConfirmed(seq, streamMessage, producer.GetID()) - producer.dynamicSendCh <- &messageSequence{ - messageBytes: marshalBinary, - unCompressedSize: len(marshalBinary), - publishingId: seq, - filterValue: filterValue, - refMessage: &streamMessage, - } + producer.addUnConfirmed(messageSeq.publishingId, streamMessage, producer.GetID()) + producer.dynamicSendCh <- messageSeq return nil } -func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 { - sequence := message.GetPublishingId() - // in case of sub entry the deduplication is disabled - if !message.HasPublishingId() || producer.options.isSubEntriesBatching() { - sequence = atomic.AddInt64(&producer.sequence, 1) - } - return sequence -} - // BatchSend sends a batch of messages to the stream and returns an error if the messages could not be sent. // The method is synchronous. The aggregation is up to the user. The user has to aggregate the messages // and send them in a batch. // BatchSend is not affected by the BatchSize and BatchPublishingDelay options. -// BatchSend is the primitive method to send messages to the stream, the method Send prepares the messages and -// calls BatchSend internally. -// It automatically splits the messages in multiple frames if the total size of the messages is greater than the -// requestedMaxFrameSize. func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error { maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize var messagesSequence = make([]*messageSequence, 0) totalBufferToSend := 0 for _, batchMessage := range batchMessages { - messageBytes, err := batchMessage.MarshalBinary() + messageSeq, err := producer.fromMessageToMessageSequence(batchMessage) if err != nil { return err } - filterValue := "" - if producer.options.IsFilterEnabled() { - filterValue = producer.options.Filter.FilterValue(batchMessage) - } - totalBufferToSend += len(messageBytes) + totalBufferToSend += len(messageSeq.messageBytes) // if the totalBufferToSend is greater than the requestedMaxFrameSize // the producer sends the messages and reset the buffer // it splits the messages in multiple frames - messagesSequence = append(messagesSequence, &messageSequence{ - messageBytes: messageBytes, - unCompressedSize: len(messageBytes), - publishingId: producer.assignPublishingID(batchMessage), - filterValue: filterValue, - refMessage: &batchMessage, - }) + messagesSequence = append(messagesSequence, messageSeq) } // @@ -492,6 +465,7 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error func (producer *Producer) GetID() uint8 { return producer.id } + func (producer *Producer) internalBatchSend(messagesSequence []*messageSequence) error { return producer.internalBatchSendProdId(messagesSequence, producer.GetID()) } @@ -631,9 +605,8 @@ func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSeq } func (producer *Producer) flushUnConfirmedMessages(errorCode uint16, err error) { - producer.mutexUnconfirmed.Lock() - for _, msg := range producer.unConfirmedMessages { + for _, msg := range producer.unConfirmed.getAll() { msg.confirmed = false msg.err = err msg.errorCode = errorCode @@ -641,9 +614,8 @@ func (producer *Producer) flushUnConfirmedMessages(errorCode uint16, err error) producer.publishConfirm <- []*ConfirmationStatus{msg} } } - producer.unConfirmedMessages = make(map[int64]*ConfirmationStatus) + producer.unConfirmed.clear() - producer.mutexUnconfirmed.Unlock() } func (producer *Producer) GetLastPublishingId() (int64, error) { diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go new file mode 100644 index 00000000..5162849b --- /dev/null +++ b/pkg/stream/producer_unconfirmed.go @@ -0,0 +1,108 @@ +package stream + +import "sync" + +type unConfirmedPartition struct { + messages map[int64]*ConfirmationStatus + mutex sync.Mutex +} + +func newUnConfirmedPartition() *unConfirmedPartition { + return &unConfirmedPartition{ + messages: make(map[int64]*ConfirmationStatus), + } +} + +func (u *unConfirmedPartition) add(id int64, cf *ConfirmationStatus) { + u.mutex.Lock() + defer u.mutex.Unlock() + u.messages[id] = cf +} + +func (u *unConfirmedPartition) remove(id int64) { + u.mutex.Lock() + defer u.mutex.Unlock() + delete(u.messages, id) +} + +func (u *unConfirmedPartition) get(id int64) *ConfirmationStatus { + u.mutex.Lock() + defer u.mutex.Unlock() + return u.messages[id] +} + +func (u *unConfirmedPartition) size() int { + u.mutex.Lock() + defer u.mutex.Unlock() + return len(u.messages) +} +func (u *unConfirmedPartition) clear() { + u.mutex.Lock() + defer u.mutex.Unlock() + u.messages = make(map[int64]*ConfirmationStatus) +} + +func (u *unConfirmedPartition) getAll() map[int64]*ConfirmationStatus { + u.mutex.Lock() + defer u.mutex.Unlock() + result := make(map[int64]*ConfirmationStatus) + for i, message := range u.messages { + result[i] = message + } + return result +} + +type unConfirmed struct { + partitions []*unConfirmedPartition + partitionNumber int +} + +func newUnConfirmed(partitionNumber int) *unConfirmed { + var partitions []*unConfirmedPartition + for i := 0; i < partitionNumber; i++ { + partitions = append(partitions, newUnConfirmedPartition()) + } + return &unConfirmed{ + partitions: partitions, + partitionNumber: partitionNumber, + } +} + +func (u *unConfirmed) add(id int64, cf *ConfirmationStatus) { + partition := id % int64(u.partitionNumber) + u.partitions[partition].add(id, cf) +} + +func (u *unConfirmed) remove(id int64) { + partition := id % int64(u.partitionNumber) + u.partitions[partition].remove(id) +} + +func (u *unConfirmed) get(id int64) *ConfirmationStatus { + partition := id % int64(u.partitionNumber) + return u.partitions[partition].get(id) +} + +func (u *unConfirmed) size() int { + size := 0 + for _, partition := range u.partitions { + size += partition.size() + } + return size +} + +func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus { + result := make(map[int64]*ConfirmationStatus) + for _, partition := range u.partitions { + for _, status := range partition.getAll() { + result[status.publishingId] = status + } + } + return result +} + +func (u *unConfirmed) clear() { + for _, partition := range u.partitions { + partition.clear() + } +} diff --git a/pkg/stream/producer_unconfirmed_test.go b/pkg/stream/producer_unconfirmed_test.go new file mode 100644 index 00000000..bfda3621 --- /dev/null +++ b/pkg/stream/producer_unconfirmed_test.go @@ -0,0 +1,84 @@ +package stream + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sync" +) + +var _ = Describe("UnConfirmed tests ", func() { + + It("Hash Consistency", func() { + unConfirmed := newUnConfirmed(10) + for i := 0; i < 10; i++ { + unConfirmed.add(int64(i), &ConfirmationStatus{}) + } + for i := 0; i < 10; i++ { + Expect(unConfirmed.get(int64(i))).NotTo(BeNil()) + } + Expect(len(unConfirmed.partitions)).To(Equal(10)) + Expect(unConfirmed.partitions[0].size()).To(Equal(1)) + Expect(unConfirmed.partitions[1].size()).To(Equal(1)) + Expect(unConfirmed.partitions[2].size()).To(Equal(1)) + Expect(unConfirmed.partitions[3].size()).To(Equal(1)) + Expect(unConfirmed.partitions[4].size()).To(Equal(1)) + Expect(unConfirmed.partitions[5].size()).To(Equal(1)) + Expect(unConfirmed.partitions[6].size()).To(Equal(1)) + Expect(unConfirmed.partitions[7].size()).To(Equal(1)) + Expect(unConfirmed.partitions[8].size()).To(Equal(1)) + Expect(unConfirmed.partitions[9].size()).To(Equal(1)) + unConfirmed.clear() + for i := 0; i < 10; i++ { + Expect(unConfirmed.partitions[i].size()).To(Equal(0)) + } + Expect(unConfirmed.size()).To(Equal(0)) + }) + + It("GetAll Result should be consistent", func() { + // the map should be order + // even it is not strictly necessary to be order + + for sz := 1; sz <= 10; sz++ { + unConfirmed := newUnConfirmed(sz) + for i := 0; i < 500; i++ { + unConfirmed.add(int64(i), &ConfirmationStatus{}) + } + result := unConfirmed.getAll() + exceptedValue := 0 + for i, status := range result { + Expect(i).To(Equal(status.GetPublishingId())) + Expect(i).To(Equal(int64(exceptedValue))) + exceptedValue++ + } + } + + }) + + It("GetAll Result should be consistent in multi-thread", func() { + // the map should be order in multi-thread + // even it is not strictly necessary to be order + + for sz := 1; sz <= 10; sz++ { + unConfirmed := newUnConfirmed(sz) + wait := &sync.WaitGroup{} + for i := 0; i < 500; i++ { + wait.Add(1) + go func(idx int) { + unConfirmed.add(int64(idx), &ConfirmationStatus{}) + wait.Done() + }(i) + } + wait.Wait() + + result := unConfirmed.getAll() + exceptedValue := 0 + for i, status := range result { + Expect(i).To(Equal(status.GetPublishingId())) + Expect(i).To(Equal(int64(exceptedValue))) + exceptedValue++ + } + } + + }) + +}) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index cdb16861..82a9b397 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -474,7 +474,7 @@ func (c *Client) handlePublishError(buffer *bufio.Reader) { producer, err := c.coordinator.GetProducerById(publisherId) if err != nil { logs.LogWarn("producer id %d not found, publish error :%s", publisherId, lookErrorCode(code)) - producer = &Producer{unConfirmedMessages: map[int64]*ConfirmationStatus{}} + producer = &Producer{unConfirmed: newUnConfirmed(10)} } else { unConfirmedMessage := producer.getUnConfirmed(publishingId)