Skip to content

Commit

Permalink
enhancement: increasing the offset by 1 for consumer offset during re…
Browse files Browse the repository at this point in the history
…store
  • Loading branch information
shubhamcoc committed Sep 26, 2023
1 parent 3d844b6 commit 9841257
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 8 deletions.
19 changes: 13 additions & 6 deletions impl/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ type Writer interface {
const defaultMaxWaitingTimeForNewMessage = time.Duration(30) * time.Second

func (e *Exporter) Run() (exportedCount uint64, err error) {
err = e.StoreConsumerGroupOffset()
if err != nil {
return exportedCount, errors.Wrap(err, "unable to read consumer group")
}
err = e.consumer.SubscribeTopics(e.topics, nil)
if err != nil {
return
Expand All @@ -61,13 +57,21 @@ func (e *Exporter) Run() (exportedCount uint64, err error) {
if err != nil {
panic(err)
}
err = e.StoreConsumerGroupOffset()
if err != nil {
panic(errors.Wrap(err, "unable to read consumer group"))
}
os.Exit(1)
}()
defer func() {
err = e.flushData()
if err != nil {
panic(err)
}
err = e.StoreConsumerGroupOffset()
if err != nil {
panic(errors.Wrap(err, "unable to read consumer group"))
}
}()
maxWaitingTimeForNewMessage := defaultMaxWaitingTimeForNewMessage
if e.options.MaxWaitingTimeForNewMessage != nil {
Expand Down Expand Up @@ -201,13 +205,16 @@ func (e *Exporter) StoreConsumerGroupOffset() error {
}
groupTopicPartitions = append(groupTopicPartitions, groupTopicPartition)
log.Infof("groupTopicPartitions is: %v", groupTopicPartitions)
lcgor, err := e.admin.ListConsumerGroupOffsets(ctx, groupTopicPartitions, kafka.SetAdminRequireStableOffsets(true))
lcgor, err := e.admin.ListConsumerGroupOffsets(ctx, groupTopicPartitions, kafka.SetAdminRequireStableOffsets(false))
if err != nil {
return errors.Wrapf(err, "unable to list consumer groups offsets %v", groupTopicPartitions)
}
for _, res := range lcgor.ConsumerGroupsTopicPartitions {
log.Infof("consumer group topic paritions is %v", res.String())
e.writer.OffsetWrite(res)
err := e.writer.OffsetWrite(res)
if err != nil {
return err
}
}
}
}
Expand Down
31 changes: 29 additions & 2 deletions impl/parquet_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package impl

import (
"encoding/json"
"strconv"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand Down Expand Up @@ -211,11 +212,11 @@ func toKafkaConsumerGroupTopicPartitions(offsetMessages []OffsetMessage) ([]kafk
if len(offsetMessages) > 0 {
for _, offsetMessage := range offsetMessages {
var topicPartition kafka.TopicPartition
kafkaOffset := &topicPartition.Offset
err := kafkaOffset.Set(offsetMessage.Offset)
offset, err := modifyOffset(offsetMessage.Offset)
if err != nil {
return nil, errors.Wrapf(err, "Failed to set offset during consumer offset restore: %s", offsetMessage.Offset)
}
topicPartition.Offset = offset
topicPartition.Partition = offsetMessage.Partition
topicPartition.Topic = &offsetMessage.Topic
if val, ok := groupIDToPartitions[offsetMessage.GroupID]; !ok {
Expand All @@ -240,3 +241,29 @@ func toKafkaConsumerGroupTopicPartitions(offsetMessages []OffsetMessage) ([]kafk

return res, nil
}

func modifyOffset(offset string) (kafka.Offset, error) {
switch offset {
case "beginning":
fallthrough
case "earliest":
return kafka.Offset(kafka.OffsetBeginning), nil

case "end":
fallthrough
case "latest":
return kafka.Offset(kafka.OffsetEnd), nil

case "unset":
fallthrough
case "invalid":
return kafka.Offset(kafka.OffsetInvalid), nil

case "stored":
return kafka.Offset(kafka.OffsetStored), nil

default:
off, err := strconv.Atoi(offset)
return kafka.Offset(off + 1), err
}
}

0 comments on commit 9841257

Please sign in to comment.