Skip to content

Commit

Permalink
unconfirmed
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Dec 10, 2024
1 parent 16018dd commit 31f86c5
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 257 deletions.
34 changes: 30 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,7 @@ With `ProducerOptions` is possible to customize the Producer behaviour:
```golang
type ProducerOptions struct {
Name string // Producer name, it is useful to handle deduplication messages
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to send a batch of messages.
}
```

Expand Down Expand Up @@ -296,9 +294,32 @@ other producers

### `Send` vs `BatchSend`

The `BatchSend` is the primitive to send the messages, `Send` introduces a smart layer to publish messages and internally uses `BatchSend`.
The `BatchSend` is the primitive to send the messages. It is up to the user to manage the aggregation.
`Send` introduces a smart layer to publish messages and internally uses `BatchSend`.

The `Send` interface works in most of the cases, In some condition is about 15/20 slower than `BatchSend`. See also this [thread](https://groups.google.com/g/rabbitmq-users/c/IO_9-BbCzgQ).
Starting from the version 1.5.0 the `Send` uses a different approach to send messages: Dynamic send.</br>

What should you use? </br>
The `Send` method is the best choice for most of the cases:</br>
- It is asynchronous
- It is smart to aggregate the messages in a batch with a low-latency
- It is smart to split the messages in case the size is bigger than `requestedMaxFrameSize`
- You can play with `BatchSize` parameter to increase the throughput

The `BatchSend` is useful in case you need to manage the aggregation by yourself. </br>
It gives you more control over the aggregation process: </br>
- It is synchronous
- It is up to the user to manage the aggregation
- It is up to the user to split the messages in case the size is bigger than `requestedMaxFrameSize`
- It can be faster than `Send` in case the aggregation is managed by the user.

#### Throughput vs Latency</br>
With both methods you can have low-latency and/or high-throughput. </br>
The `Send` is the best choice for low-latency without care about aggregation.
With `BatchSend` you have more control.</br>


Performance test tool can help you to understand the differences between `Send` and `BatchSend` </br>

See also "Client performances" example in the [examples](./examples/performances/) directory

Expand Down Expand Up @@ -354,6 +375,11 @@ See also "Getting started" example in the [examples](./examples/) directory

### Deduplication

The deduplication is a feature that allows to avoid the duplication of messages. </br>
It is enabled by the user by setting the producer name with the options: </br>
```golang
producer, err := env.NewProducer(streamName, stream.NewProducerOptions().SetName("my_producer"))
```
The stream plugin can handle deduplication data, see this blog post for more details:
https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/ </br>
You can find a "Deduplication" example in the [examples](./examples/) directory. </br>
Expand Down
5 changes: 4 additions & 1 deletion examples/deduplication/deduplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ func main() {
}

producer, err := env.NewProducer(streamName,
stream.NewProducerOptions().SetProducerName("myProducer")) // producer name is mandatory to handle the deduplication
stream.NewProducerOptions().
// producer name is mandatory to handle the deduplication
// don't use the producer name if you don't need the deduplication
SetProducerName("myProducer"))

CheckErr(err)

Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func newTCPParameterDefault() *TCPParameters {
return &TCPParameters{
RequestedHeartbeat: defaultHeartbeat,
RequestedMaxFrameSize: 1048576,
WriteBuffer: 8192,
ReadBuffer: 8192,
WriteBuffer: defaultWriteSocketBuffer,
ReadBuffer: defaultReadSocketBuffer,
NoDelay: true,
tlsConfig: nil,
}
Expand Down
1 change: 0 additions & 1 deletion pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,6 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
if res.Err == nil {
producer.startUnconfirmedMessagesTimeOutTask()
producer.processSendingMessages()
//producer.startPublishTask()
}
return producer, res.Err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ const (
LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f"

///
defaultWriteSocketBuffer = 8092
defaultReadSocketBuffer = 65536
defaultWriteSocketBuffer = 8192
defaultReadSocketBuffer = 8192
defaultQueuePublisherSize = 10000
minQueuePublisherSize = 100
maxQueuePublisherSize = 1_000_000
Expand Down
14 changes: 9 additions & 5 deletions pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,24 @@ func (coordinator *Coordinator) NewProducer(
coordinator.mutex.Lock()
defer coordinator.mutex.Unlock()
dynSize := 10000
tickerTime := defaultConfirmationTimeOut
if parameters != nil {
dynSize = parameters.BatchSize
tickerTime = parameters.ConfirmationTimeOut
}

var lastId, err = coordinator.getNextProducerItem()
if err != nil {
return nil, err
}
var producer = &Producer{id: lastId,
options: parameters,
mutex: &sync.RWMutex{},
unConfirmed: newUnConfirmed(5),
status: open,
dynamicSendCh: make(chan *messageSequence, dynSize),
options: parameters,
mutex: &sync.RWMutex{},
unConfirmed: newUnConfirmed(),
timeoutTicker: time.NewTicker(tickerTime),
doneTimeoutTicker: make(chan struct{}),
status: open,
dynamicSendCh: make(chan *messageSequence, dynSize),
}
coordinator.producers[lastId] = producer
return producer, err
Expand Down
Loading

0 comments on commit 31f86c5

Please sign in to comment.