Skip to content

Commit

Permalink
Add manual commit for super stream consumet (#323)
Browse files Browse the repository at this point in the history
* Add manual commit to super stream consumer 

---------

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Jun 25, 2024
1 parent 0324edc commit 676ccf7
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 2 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,26 @@ You can read also the java stream-client blog post: https://rabbitmq.github.io/r
Super Stream supports [publish-filtering](#publish-filtering) and [consume-filtering](#consume-filtering) features.
Offset tracking is supported for the Super Stream consumer. </br>
In the same way as the standard stream, you can use the `SetAutoCommit` or `SetManualCommit` option to enable/disable the automatic offset tracking. </br>
On the super stream consumer message handler is possible to identify the partition, the consumer and the offset: </br>
```golang
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
....
consumerContext.Consumer.GetName() // consumer name
consumerContext.Consumer.GetOffset() // current offset
consumerContext.Consumer.GetStreamName() // stream name (partition name )
....
}
```
Manual tracking API:
- `consumerContext.Consumer.StoreOffset()`: stores the current offset.
- `consumerContext.Consumer.StoreCustomOffset(xxx)` stores a custom offset.
Like the standard stream, you should avoid to store the offset for each single message: it will reduce the performances.
### Performance test tool
Expand Down
1 change: 1 addition & 0 deletions examples/offsetTracking/offsetTracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func main() {
streamName,
handleMessages,
stream.NewConsumerOptions().
SetManualCommit(). // disable auto commit
SetConsumerName("my_consumer"). // set a consumer name
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
CheckErr(err)
Expand Down
1 change: 1 addition & 0 deletions examples/single_active_consumer/single_active_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func main() {
// This is only for the example, in a real application you should not store the offset
// for each message, it is better to store the offset for a batch of messages
err := consumerContext.Consumer.StoreOffset()

CheckErrConsumer(err)
}

Expand Down
15 changes: 13 additions & 2 deletions pkg/stream/super_stream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ type SuperStreamConsumerOptions struct {
SingleActiveConsumer *SingleActiveConsumer
ConsumerName string
AutoCommitStrategy *AutoCommitStrategy
Autocommit bool
}

func NewSuperStreamConsumerOptions() *SuperStreamConsumerOptions {
return &SuperStreamConsumerOptions{
Offset: OffsetSpecification{}.Next(),
Offset: OffsetSpecification{}.Next(),
Autocommit: false,
}
}

Expand Down Expand Up @@ -49,10 +51,16 @@ func (s *SuperStreamConsumerOptions) SetConsumerName(consumerName string) *Super
}

func (s *SuperStreamConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *SuperStreamConsumerOptions {
s.Autocommit = true
s.AutoCommitStrategy = autoCommitStrategy
return s
}

func (s *SuperStreamConsumerOptions) SetManualCommit() *SuperStreamConsumerOptions {
s.Autocommit = false
return s
}

// CPartitionClose is a struct that is used to notify the user when a partition from a consumer is closed
// The user can use the NotifyPartitionClose to get the channel
type CPartitionClose struct {
Expand Down Expand Up @@ -167,8 +175,11 @@ func (s *SuperStreamConsumer) ConnectPartition(partition string, offset OffsetSp
}

options = options.SetFilter(s.SuperStreamConsumerOptions.Filter)
if s.SuperStreamConsumerOptions.AutoCommitStrategy != nil {

if s.SuperStreamConsumerOptions.Autocommit {
options = options.SetAutoCommit(s.SuperStreamConsumerOptions.AutoCommitStrategy)
} else {
options = options.SetManualCommit()
}

if s.SuperStreamConsumerOptions.SingleActiveConsumer != nil {
Expand Down

0 comments on commit 676ccf7

Please sign in to comment.