Skip to content

Commit

Permalink
enhancement: increase offset properly by 1
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamcoc committed Sep 27, 2023
1 parent 9841257 commit ac9d50d
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 59 deletions.
59 changes: 45 additions & 14 deletions impl/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Options struct {

type Writer interface {
Write(msg kafka.Message) error
OffsetWrite(msg kafka.ConsumerGroupTopicPartitions) error
OffsetWrite(msg kafkaOffsetMessage) error
Flush() error
}

Expand All @@ -57,21 +57,13 @@ func (e *Exporter) Run() (exportedCount uint64, err error) {
if err != nil {
panic(err)
}
err = e.StoreConsumerGroupOffset()
if err != nil {
panic(errors.Wrap(err, "unable to read consumer group"))
}
os.Exit(1)
}()
defer func() {
err = e.flushData()
if err != nil {
panic(err)
}
err = e.StoreConsumerGroupOffset()
if err != nil {
panic(errors.Wrap(err, "unable to read consumer group"))
}
}()
maxWaitingTimeForNewMessage := defaultMaxWaitingTimeForNewMessage
if e.options.MaxWaitingTimeForNewMessage != nil {
Expand Down Expand Up @@ -99,7 +91,11 @@ func (e *Exporter) Run() (exportedCount uint64, err error) {
}

func (e *Exporter) flushData() error {
err := e.writer.Flush()
err := e.StoreConsumerGroupOffset()
if err != nil {
panic(errors.Wrap(err, "unable to read consumer group"))
}
err = e.writer.Flush()
if err != nil {
return errors.Wrap(err, "Failed to flush writer")
}
Expand Down Expand Up @@ -174,7 +170,7 @@ func (e *Exporter) StoreConsumerGroupOffset() error {
// get metadata of a topic
metadata, err := e.consumer.GetMetadata(&topic, false, 5000)
if err != nil {
log.Errorf("error in getting metadata of topic: %w", err)
log.Errorf("error in getting metadata of topic %s: %w", topic, err)
return errors.Wrapf(err, "unable to get metadata of a topic: %s", topic)
}
log.Debugf("metadata is: %v", metadata)
Expand All @@ -191,14 +187,34 @@ func (e *Exporter) StoreConsumerGroupOffset() error {
topicToPartitionList[topic] = kafkaTopicPartitions
}

log.Infof("topic to partition is: %v", topicToPartitionList)
log.Infof("group name list is: %v", topicTogroupNameList)
log.Debugf("topic to partition is: %v", topicToPartitionList)
log.Debugf("group name list is: %v", topicTogroupNameList)

for _, topic := range e.topics {
groupList := topicTogroupNameList[topic]
for k := range groupList {
kafkaTopicPartitionsForFinal := make([]kafkaTopicPartition, 0)
groupTopicPartitions := make([]kafka.ConsumerGroupTopicPartitions, 0)
kafkaTopicPartitions := topicToPartitionList[topic]
// Get the watermark for each partition of a topic
// and create a datastructure to store the values
for _, partition := range kafkaTopicPartitions {
// This is to store the watermark offset high and low value
var store kafkaTopicPartition
low, high, err := e.consumer.GetWatermarkOffsets(topic, partition.Partition)
if err != nil {
log.Errorf("error in getting watermark offset of topic %s, partition %d: %w", topic, partition.Partition, err)
return errors.Wrapf(err, "unable to get metadata of a topic: %s, partition %d", topic, partition.Partition)
}
store.WOH = high
store.WOL = low
store.TopicPartition = partition
kafkaTopicPartitionsForFinal = append(kafkaTopicPartitionsForFinal, store)
}
log.Infof("kafkaTopicPartitionsForFinal is: %v", kafkaTopicPartitionsForFinal)
l := len(kafkaTopicPartitionsForFinal)
log.Infof("length of kafkaTopicPartitionsForFinal is: %d", l)
// required to fetch the current offset of the consumer group
groupTopicPartition := kafka.ConsumerGroupTopicPartitions{
Group: k,
Partitions: kafkaTopicPartitions,
Expand All @@ -209,9 +225,24 @@ func (e *Exporter) StoreConsumerGroupOffset() error {
if err != nil {
return errors.Wrapf(err, "unable to list consumer groups offsets %v", groupTopicPartitions)
}
var finalRes kafkaOffsetMessage
// matching the topic partition and updating the offset with current offset
for _, res := range lcgor.ConsumerGroupsTopicPartitions {
log.Infof("consumer group topic paritions is %v", res.String())
err := e.writer.OffsetWrite(res)
finalRes.Group = res.Group
for i, customKafkaTopicPartition := range kafkaTopicPartitionsForFinal {
for _, kafkaTopicPartition := range res.Partitions {
if customKafkaTopicPartition.Partition == kafkaTopicPartition.Partition && *customKafkaTopicPartition.Topic == *kafkaTopicPartition.Topic {
log.Infof("comparing kafka topic partition: %v, %v", customKafkaTopicPartition.Partition, kafkaTopicPartition.Partition)
log.Infof("comparing kafka topic: %v, %v", *customKafkaTopicPartition.Topic, *kafkaTopicPartition.Topic)
kafkaTopicPartitionsForFinal[i].Offset = kafkaTopicPartition.Offset
}
}
}
log.Infof("final Result before substitution: %v", finalRes)
finalRes.KafkaTopicPartitions = kafkaTopicPartitionsForFinal
log.Infof("final Result after substitution: %v", finalRes)
err := e.writer.OffsetWrite(finalRes)
if err != nil {
return err
}
Expand Down
15 changes: 10 additions & 5 deletions impl/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ func NewImporter(log log.Logger, producer *kafka.Producer, deliveryChan chan kaf
}

type Reader interface {
ReadMessage(restoreBefore, restoreAfter time.Time) chan kafka.Message
ReadOffset() chan kafka.ConsumerGroupTopicPartitions
ReadMessage(time.Time, time.Time, chan int) chan kafka.Message
ReadOffset(chan int) chan kafka.ConsumerGroupTopicPartitions
}

func (i *Importer) Run(cfg kafka_utils.Config) error {
cx := make(chan os.Signal, 1)
doneChan := make(chan int)
signal.Notify(cx, os.Interrupt, syscall.SIGTERM)
go func() {
<-cx
Expand All @@ -65,8 +66,8 @@ func (i *Importer) Run(cfg kafka_utils.Config) error {
defer func() {
i.producer.Flush(30 * 1000)
}()
offsetChan := i.reader.ReadOffset()
messageChn := i.reader.ReadMessage(i.restoreBefore, i.restoreAfter)
messageChn := i.reader.ReadMessage(i.restoreBefore, i.restoreAfter, doneChan)
offsetChan := i.reader.ReadOffset(doneChan)

for {
select {
Expand All @@ -90,7 +91,11 @@ func (i *Importer) Run(cfg kafka_utils.Config) error {
}
res, err := consumer.CommitOffsets(offsetMessage.Partitions)
if err != nil {
panic(errors.Wrap(err, "Unable to restore offsets of consumer"))
if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrNoOffset {
log.Warnf("No offset, it can happen when there is no messages have been consumed, error is: %v", err)
} else {
panic(errors.Wrap(err, "Unable to restore offsets of consumer"))
}
}
i.logger.Infof("final result of commit offsets is: %v", res)
consumer.Close()
Expand Down
72 changes: 42 additions & 30 deletions impl/parquet_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewParquetReader(filePathMessage, filePathOffset string, includePartitionAn

const batchSize = 10

func (p *ParquetReader) ReadMessage(restoreBefore, restoreAfter time.Time) chan kafka.Message {
func (p *ParquetReader) ReadMessage(restoreBefore, restoreAfter time.Time, doneChan chan int) chan kafka.Message {
ch := make(chan kafka.Message, batchSize)
if p.parquetReaderMessage == nil {
return ch
Expand Down Expand Up @@ -90,11 +90,14 @@ func (p *ParquetReader) ReadMessage(restoreBefore, restoreAfter time.Time) chan
panic(errors.Wrap(err, "Failed to close fileReader"))
}
close(ch)
doneChan <- 0
close(doneChan)
log.Infof("kafka message restored successfully")
}()
return ch
}

func (p *ParquetReader) ReadOffset() chan kafka.ConsumerGroupTopicPartitions {
func (p *ParquetReader) ReadOffset(doneChan chan int) chan kafka.ConsumerGroupTopicPartitions {
ch := make(chan kafka.ConsumerGroupTopicPartitions, batchSize)
// When offset file is not given
if p.parquetReaderOffset == nil {
Expand All @@ -106,34 +109,40 @@ func (p *ParquetReader) ReadOffset() chan kafka.ConsumerGroupTopicPartitions {
if rowNum == 0 {
return ch
}
go func() {
for i := 0; i < rowNum/batchSize+1; i++ {
offsetMessages := make([]OffsetMessage, batchSize)
if err := p.parquetReaderOffset.Read(&offsetMessages); err != nil {
err = errors.Wrap(err, "Failed to bulk read messages from parquet file")
panic(err)
}
go func(doneChan chan int) {
// wait for all the messages to be restore first, then
// restore the kafka consumer group offset
val := <-doneChan
if val == 0 {
for i := 0; i < rowNum/batchSize+1; i++ {
offsetMessages := make([]OffsetMessage, batchSize)
if err := p.parquetReaderOffset.Read(&offsetMessages); err != nil {
err = errors.Wrap(err, "Failed to bulk read messages from parquet file")
panic(err)
}

resMessages, err := toKafkaConsumerGroupTopicPartitions(offsetMessages)
if err != nil {
err = errors.Wrapf(err, "Failed to parse offset messages from offset file")
panic(err)
}
resMessages, err := toKafkaConsumerGroupTopicPartitions(offsetMessages)
if err != nil {
err = errors.Wrapf(err, "Failed to parse offset messages from offset file")
panic(err)
}

for _, offsetMessage := range resMessages {
counter++
log.Infof("offset message is: %v", offsetMessage)
ch <- offsetMessage
log.Infof("Loaded %f% (%d/%d)", counter/rowNum, counter, rowNum)
for _, offsetMessage := range resMessages {
counter++
log.Infof("offset message is: %v", offsetMessage)
ch <- offsetMessage
log.Infof("Loaded %f% (%d/%d)", counter/rowNum, counter, rowNum)
}
}
p.parquetReaderOffset.ReadStop()
err := p.fileReaderOffset.Close()
if err != nil {
panic(errors.Wrap(err, "Failed to close fileReader"))
}
close(ch)
log.Infof("consumer offset restored successfully")
}
p.parquetReaderOffset.ReadStop()
err := p.fileReaderOffset.Close()
if err != nil {
panic(errors.Wrap(err, "Failed to close fileReader"))
}
close(ch)
}()
}(doneChan)
return ch
}

Expand Down Expand Up @@ -212,7 +221,7 @@ func toKafkaConsumerGroupTopicPartitions(offsetMessages []OffsetMessage) ([]kafk
if len(offsetMessages) > 0 {
for _, offsetMessage := range offsetMessages {
var topicPartition kafka.TopicPartition
offset, err := modifyOffset(offsetMessage.Offset)
offset, err := modifyOffset(offsetMessage)
if err != nil {
return nil, errors.Wrapf(err, "Failed to set offset during consumer offset restore: %s", offsetMessage.Offset)
}
Expand Down Expand Up @@ -242,8 +251,8 @@ func toKafkaConsumerGroupTopicPartitions(offsetMessages []OffsetMessage) ([]kafk
return res, nil
}

func modifyOffset(offset string) (kafka.Offset, error) {
switch offset {
func modifyOffset(OM OffsetMessage) (kafka.Offset, error) {
switch OM.Offset {
case "beginning":
fallthrough
case "earliest":
Expand All @@ -263,7 +272,10 @@ func modifyOffset(offset string) (kafka.Offset, error) {
return kafka.Offset(kafka.OffsetStored), nil

default:
off, err := strconv.Atoi(offset)
off, err := strconv.Atoi(OM.Offset)
if off == int(OM.WatermarkOffsetHigh) {
return kafka.Offset(off), err
}
return kafka.Offset(off + 1), err
}
}
35 changes: 25 additions & 10 deletions impl/parquet_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,23 @@ type KafkaMessage struct {
}

type OffsetMessage struct {
GroupID string `parquet:"name=groupid, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Topic string `parquet:"name=topic, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Partition int32 `parquet:"name=partition, type=INT32, convertedtype=INT_32"`
Offset string `parquet:"name=offset, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
GroupID string `parquet:"name=groupid, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Topic string `parquet:"name=topic, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Partition int32 `parquet:"name=partition, type=INT32, convertedtype=INT_32"`
Offset string `parquet:"name=offset, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
WatermarkOffsetLow int64 `parquet:"name=watermarklow, type=INT64, convertedtype=INT_64"`
WatermarkOffsetHigh int64 `parquet:"name=watermakehigh, type=INT64, convertedtype=INT_64"`
}

type kafkaTopicPartition struct {
kafka.TopicPartition
WOH int64
WOL int64
}

type kafkaOffsetMessage struct {
Group string
KafkaTopicPartitions []kafkaTopicPartition
}

func (f *ParquetWriter) Write(msg kafka.Message) (err error) {
Expand All @@ -80,13 +93,15 @@ func (f *ParquetWriter) Write(msg kafka.Message) (err error) {
return err
}

func (f *ParquetWriter) OffsetWrite(msg kafka.ConsumerGroupTopicPartitions) (err error) {
for _, partition := range msg.Partitions {
func (f *ParquetWriter) OffsetWrite(msg kafkaOffsetMessage) (err error) {
for _, partition := range msg.KafkaTopicPartitions {
message := OffsetMessage{
GroupID: msg.Group,
Topic: *partition.Topic,
Partition: partition.Partition,
Offset: partition.Offset.String(),
GroupID: msg.Group,
Topic: *partition.Topic,
Partition: partition.Partition,
Offset: partition.Offset.String(),
WatermarkOffsetLow: partition.WOL,
WatermarkOffsetHigh: partition.WOH,
}

err = f.parquetWriterOffset.Write(message)
Expand Down

0 comments on commit ac9d50d

Please sign in to comment.