Skip to content

Commit

Permalink
code refactor
Browse files Browse the repository at this point in the history
add unConfirmed struct to balance the unconfirm messages.
refactor code

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Dec 10, 2024
1 parent 3930f60 commit 16018dd
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 94 deletions.
11 changes: 5 additions & 6 deletions pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,11 @@ func (coordinator *Coordinator) NewProducer(
return nil, err
}
var producer = &Producer{id: lastId,
options: parameters,
mutex: &sync.RWMutex{},
mutexUnconfirmed: &sync.Mutex{},
unConfirmedMessages: map[int64]*ConfirmationStatus{},
status: open,
dynamicSendCh: make(chan *messageSequence, dynSize),
options: parameters,
mutex: &sync.RWMutex{},
unConfirmed: newUnConfirmed(5),
status: open,
dynamicSendCh: make(chan *messageSequence, dynSize),
}
coordinator.producers[lastId] = producer
return producer, err
Expand Down
146 changes: 59 additions & 87 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,15 @@ type messageSequence struct {
}

type Producer struct {
id uint8
options *ProducerOptions
onClose onInternalClose
mutexUnconfirmed *sync.Mutex
unConfirmedMessages map[int64]*ConfirmationStatus
sequence int64
mutex *sync.RWMutex
publishConfirm chan []*ConfirmationStatus
closeHandler chan Event
status int
id uint8
options *ProducerOptions
onClose onInternalClose
unConfirmed *unConfirmed
sequence int64
mutex *sync.RWMutex
publishConfirm chan []*ConfirmationStatus
closeHandler chan Event
status int

dynamicSendCh chan *messageSequence
}
Expand Down Expand Up @@ -165,71 +164,56 @@ func NewProducerOptions() *ProducerOptions {
}

func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus {
producer.mutexUnconfirmed.Lock()
defer producer.mutexUnconfirmed.Unlock()
return producer.unConfirmedMessages
return producer.unConfirmed.getAll()
}

func (producer *Producer) addUnConfirmedSequences(message []*messageSequence, producerID uint8) {
producer.mutexUnconfirmed.Lock()
defer producer.mutexUnconfirmed.Unlock()

for _, msg := range message {
producer.unConfirmedMessages[msg.publishingId] =
producer.unConfirmed.add(msg.publishingId,
&ConfirmationStatus{
inserted: time.Now(),
message: *msg.refMessage,
producerID: producerID,
publishingId: msg.publishingId,
confirmed: false,
}
})
}

}
func (producer *Producer) addUnConfirmed(sequence int64, message message.StreamMessage, producerID uint8) {
producer.mutexUnconfirmed.Lock()
defer producer.mutexUnconfirmed.Unlock()
producer.unConfirmedMessages[sequence] = &ConfirmationStatus{
producer.unConfirmed.add(sequence, &ConfirmationStatus{
inserted: time.Now(),
message: message,
producerID: producerID,
publishingId: sequence,
confirmed: false,
}
})
}

func (po *ProducerOptions) isSubEntriesBatching() bool {
return po.SubEntrySize > 1
}

func (producer *Producer) removeFromConfirmationStatus(status []*ConfirmationStatus) {
producer.mutexUnconfirmed.Lock()
defer producer.mutexUnconfirmed.Unlock()

for _, msg := range status {
delete(producer.unConfirmedMessages, msg.publishingId)
producer.unConfirmed.remove(msg.publishingId)
for _, linked := range msg.linkedTo {
delete(producer.unConfirmedMessages, linked.publishingId)
producer.unConfirmed.remove(linked.publishingId)
}
}
}

func (producer *Producer) removeUnConfirmed(sequence int64) {
producer.mutexUnconfirmed.Lock()
defer producer.mutexUnconfirmed.Unlock()
delete(producer.unConfirmedMessages, sequence)
producer.unConfirmed.remove(sequence)
}

func (producer *Producer) lenUnConfirmed() int {
producer.mutexUnconfirmed.Lock()
defer producer.mutexUnconfirmed.Unlock()
return len(producer.unConfirmedMessages)
return producer.unConfirmed.size()
}

func (producer *Producer) getUnConfirmed(sequence int64) *ConfirmationStatus {
producer.mutexUnconfirmed.Lock()
defer producer.mutexUnconfirmed.Unlock()
return producer.unConfirmedMessages[sequence]
return producer.unConfirmed.get(sequence)
}

func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm {
Expand Down Expand Up @@ -270,16 +254,14 @@ func (producer *Producer) startUnconfirmedMessagesTimeOutTask() {
time.Sleep(2 * time.Second)
toRemove := make([]*ConfirmationStatus, 0)
// check the unconfirmed messages and remove the one that are expired
producer.mutexUnconfirmed.Lock()
for _, msg := range producer.unConfirmedMessages {
for _, msg := range producer.unConfirmed.getAll() {
if time.Since(msg.inserted) > producer.options.ConfirmationTimeOut {
msg.err = ConfirmationTimoutError
msg.errorCode = timeoutError
msg.confirmed = false
toRemove = append(toRemove, msg)
}
}
producer.mutexUnconfirmed.Unlock()

if len(toRemove) > 0 {
producer.removeFromConfirmationStatus(toRemove)
Expand Down Expand Up @@ -331,10 +313,6 @@ func (producer *Producer) processSendingMessages() {
logs.LogError("Producer %d, error during batch send: %s", producer.GetID(), err)
}
sent += len(queueMessages)
//frames += result.TotalFrames
//if result.TotalSent != len(queueMessages) {
// logs.LogError("Producer %d, error during batch send: %s", producer.GetID(), err)
//}
queueMessages = queueMessages[:0]
iterations++
if iterations > 0 && iterations%1000000 == 0 {
Expand Down Expand Up @@ -380,28 +358,52 @@ func (producer *Producer) processSendingMessages() {
// The two goroutines are ready
}

// Send sends a message to the stream and returns an error if the message could not be sent.
// Send is asynchronous. The aggregation of the messages is based on the BatchSize and BatchPublishingDelay
// options. The message is sent when the aggregation is reached or the BatchPublishingDelay is reached.
func (producer *Producer) Send(streamMessage message.StreamMessage) error {
func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 {
sequence := message.GetPublishingId()
// in case of sub entry the deduplication is disabled
if !message.HasPublishingId() || producer.options.isSubEntriesBatching() {
sequence = atomic.AddInt64(&producer.sequence, 1)
}
return sequence
}

func (producer *Producer) fromMessageToMessageSequence(streamMessage message.StreamMessage) (*messageSequence, error) {
marshalBinary, err := streamMessage.MarshalBinary()
if err != nil {
return err
return nil, err
}
seq := producer.assignPublishingID(streamMessage)
filterValue := ""
if producer.options.IsFilterEnabled() {
filterValue = producer.options.Filter.FilterValue(streamMessage)
}

if len(marshalBinary) > producer.options.client.getTuneState().requestedMaxFrameSize {
return &messageSequence{
messageBytes: marshalBinary,
unCompressedSize: len(marshalBinary),
publishingId: seq,
filterValue: filterValue,
refMessage: &streamMessage,
}, nil

}

// Send sends a message to the stream and returns an error if the message could not be sent.
// Send is asynchronous. The aggregation of the messages is based on the BatchSize and BatchPublishingDelay
// options. The message is sent when the aggregation is reached or the BatchPublishingDelay is reached.
func (producer *Producer) Send(streamMessage message.StreamMessage) error {
messageSeq, err := producer.fromMessageToMessageSequence(streamMessage)
if err != nil {
return err
}
if len(messageSeq.messageBytes) > producer.options.client.getTuneState().requestedMaxFrameSize {
if producer.publishConfirm != nil {
producer.publishConfirm <- []*ConfirmationStatus{
{
inserted: time.Now(),
message: streamMessage,
producerID: producer.GetID(),
publishingId: seq,
publishingId: messageSeq.publishingId,
confirmed: false,
err: FrameTooLarge,
errorCode: responseCodeFrameTooLarge,
Expand All @@ -410,60 +412,31 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error {
}
return FrameTooLarge
}
producer.addUnConfirmed(seq, streamMessage, producer.GetID())
producer.dynamicSendCh <- &messageSequence{
messageBytes: marshalBinary,
unCompressedSize: len(marshalBinary),
publishingId: seq,
filterValue: filterValue,
refMessage: &streamMessage,
}
producer.addUnConfirmed(messageSeq.publishingId, streamMessage, producer.GetID())
producer.dynamicSendCh <- messageSeq
return nil
}

func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 {
sequence := message.GetPublishingId()
// in case of sub entry the deduplication is disabled
if !message.HasPublishingId() || producer.options.isSubEntriesBatching() {
sequence = atomic.AddInt64(&producer.sequence, 1)
}
return sequence
}

// BatchSend sends a batch of messages to the stream and returns an error if the messages could not be sent.
// The method is synchronous. The aggregation is up to the user. The user has to aggregate the messages
// and send them in a batch.
// BatchSend is not affected by the BatchSize and BatchPublishingDelay options.
// BatchSend is the primitive method to send messages to the stream, the method Send prepares the messages and
// calls BatchSend internally.
// It automatically splits the messages in multiple frames if the total size of the messages is greater than the
// requestedMaxFrameSize.
func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error {
maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize
var messagesSequence = make([]*messageSequence, 0)
totalBufferToSend := 0
for _, batchMessage := range batchMessages {
messageBytes, err := batchMessage.MarshalBinary()
messageSeq, err := producer.fromMessageToMessageSequence(batchMessage)
if err != nil {
return err
}
filterValue := ""
if producer.options.IsFilterEnabled() {
filterValue = producer.options.Filter.FilterValue(batchMessage)
}

totalBufferToSend += len(messageBytes)
totalBufferToSend += len(messageSeq.messageBytes)
// if the totalBufferToSend is greater than the requestedMaxFrameSize
// the producer sends the messages and reset the buffer
// it splits the messages in multiple frames

messagesSequence = append(messagesSequence, &messageSequence{
messageBytes: messageBytes,
unCompressedSize: len(messageBytes),
publishingId: producer.assignPublishingID(batchMessage),
filterValue: filterValue,
refMessage: &batchMessage,
})
messagesSequence = append(messagesSequence, messageSeq)
}
//

Expand Down Expand Up @@ -492,6 +465,7 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
func (producer *Producer) GetID() uint8 {
return producer.id
}

func (producer *Producer) internalBatchSend(messagesSequence []*messageSequence) error {
return producer.internalBatchSendProdId(messagesSequence, producer.GetID())
}
Expand Down Expand Up @@ -631,19 +605,17 @@ func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSeq
}

func (producer *Producer) flushUnConfirmedMessages(errorCode uint16, err error) {
producer.mutexUnconfirmed.Lock()

for _, msg := range producer.unConfirmedMessages {
for _, msg := range producer.unConfirmed.getAll() {
msg.confirmed = false
msg.err = err
msg.errorCode = errorCode
if producer.publishConfirm != nil {
producer.publishConfirm <- []*ConfirmationStatus{msg}
}
}
producer.unConfirmedMessages = make(map[int64]*ConfirmationStatus)
producer.unConfirmed.clear()

producer.mutexUnconfirmed.Unlock()
}

func (producer *Producer) GetLastPublishingId() (int64, error) {
Expand Down
Loading

0 comments on commit 16018dd

Please sign in to comment.