Skip to content

Commit

Permalink
fix offset of single active consumer reset to 0 (#336)
Browse files Browse the repository at this point in the history
* fix consumer offset being reset to 0 when closing consumer when no messages have been consumed
  • Loading branch information
Darthmineboy authored Jul 22, 2024
1 parent e485574 commit 18547a0
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,9 @@ func (c *Client) DeclareSubscriber(streamName string,
// copy the option offset to the consumer offset
// the option.offset won't change ( in case we need to retrive the original configuration)
// consumer.current offset will be moved when reading
consumer.setCurrentOffset(options.Offset.offset)
if !options.IsSingleActiveConsumerEnabled() {
consumer.setCurrentOffset(options.Offset.offset)
}

/// define the consumerOptions
consumerProperties := make(map[string]string)
Expand Down
60 changes: 60 additions & 0 deletions pkg/stream/consumer_sac_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,64 @@ var _ = Describe("Streaming Single Active Consumer", func() {
Expect(c2.Close()).NotTo(HaveOccurred())
})

It("offset should not be overwritten by autocommit on consumer close when no messages have been consumed", func() {
producer, err := testEnvironment.NewProducer(streamName, nil)
Expect(err).NotTo(HaveOccurred())

consumerUpdate := func(streamName string, isActive bool) OffsetSpecification {
offset, err := testEnvironment.QueryOffset("my_consumer", streamName)
if err != nil {
return OffsetSpecification{}.First()
}

return OffsetSpecification{}.Offset(offset + 1)
}

var messagesReceived int32 = 0
consumerA, err := testEnvironment.NewConsumer(streamName,
func(consumerContext ConsumerContext, message *amqp.Message) {
atomic.AddInt32(&messagesReceived, 1)
}, NewConsumerOptions().
SetSingleActiveConsumer(NewSingleActiveConsumer(consumerUpdate)).
SetConsumerName("my_consumer").
SetAutoCommit(nil))
Expect(err).NotTo(HaveOccurred())

Expect(producer.BatchSend(CreateArrayMessagesForTesting(10))).NotTo(HaveOccurred())
Eventually(func() int32 {
return atomic.LoadInt32(&messagesReceived)
}, 5*time.Second).Should(Equal(int32(10)),
"consumer should receive only 10 messages")

Expect(consumerA.Close()).NotTo(HaveOccurred())
Expect(consumerA.GetLastStoredOffset()).To(Equal(int64(9)))

offset, err := testEnvironment.QueryOffset("my_consumer", streamName)
Expect(err).NotTo(HaveOccurred())
Expect(offset).To(Equal(int64(9)))

messagesReceived = 0
consumerB, err := testEnvironment.NewConsumer(streamName,
func(consumerContext ConsumerContext, message *amqp.Message) {
atomic.AddInt32(&messagesReceived, 1)
}, NewConsumerOptions().
SetConsumerName("my_consumer").
SetSingleActiveConsumer(NewSingleActiveConsumer(consumerUpdate)).
SetAutoCommit(nil))
Expect(err).NotTo(HaveOccurred())

Expect(consumerB.Close()).NotTo(HaveOccurred())
time.Sleep(100 * time.Millisecond)
Eventually(func() int32 {
return atomic.LoadInt32(&messagesReceived)
}, 5*time.Second).Should(Equal(int32(0)),
"consumer should have received no messages")

offsetAfter, err := testEnvironment.QueryOffset("my_consumer", streamName)
Expect(err).NotTo(HaveOccurred())
Expect(offsetAfter).To(Equal(int64(9)))

Expect(producer.Close()).NotTo(HaveOccurred())
})

})
1 change: 1 addition & 0 deletions pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler,
status: open,
mutex: &sync.Mutex{},
MessagesHandler: messagesHandler,
currentOffset: -1, // currentOffset has to equal lastStoredOffset as the currentOffset 0 may otherwise be flushed to the server when the consumer is closed and auto commit is enabled
lastStoredOffset: -1, // because 0 is a valid value for the offset
isPromotedAsActive: true,
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/stream/server_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,11 @@ func (c *Client) handleConsumerUpdate(readProtocol *ReaderProtocol, r *bufio.Rea
responseOff := consumer.options.SingleActiveConsumer.ConsumerUpdate(consumer.GetStreamName(),
isActive == 1)
consumer.options.SingleActiveConsumer.offsetSpecification = responseOff

if isActive == 1 {
consumer.setCurrentOffset(responseOff.offset)
}

err = consumer.writeConsumeUpdateOffsetToSocket(readProtocol.CorrelationId, responseOff)
logErrorCommand(err, "handleConsumerUpdate writeConsumeUpdateOffsetToSocket")
}
Expand Down

0 comments on commit 18547a0

Please sign in to comment.