diff --git a/impl/exporter.go b/impl/exporter.go index 2f94d11..d887140 100644 --- a/impl/exporter.go +++ b/impl/exporter.go @@ -44,10 +44,6 @@ type Writer interface { const defaultMaxWaitingTimeForNewMessage = time.Duration(30) * time.Second func (e *Exporter) Run() (exportedCount uint64, err error) { - err = e.StoreConsumerGroupOffset() - if err != nil { - return exportedCount, errors.Wrap(err, "unable to read consumer group") - } err = e.consumer.SubscribeTopics(e.topics, nil) if err != nil { return @@ -61,6 +57,10 @@ func (e *Exporter) Run() (exportedCount uint64, err error) { if err != nil { panic(err) } + err = e.StoreConsumerGroupOffset() + if err != nil { + panic(errors.Wrap(err, "unable to read consumer group")) + } os.Exit(1) }() defer func() { @@ -68,6 +68,10 @@ func (e *Exporter) Run() (exportedCount uint64, err error) { if err != nil { panic(err) } + err = e.StoreConsumerGroupOffset() + if err != nil { + panic(errors.Wrap(err, "unable to read consumer group")) + } }() maxWaitingTimeForNewMessage := defaultMaxWaitingTimeForNewMessage if e.options.MaxWaitingTimeForNewMessage != nil { @@ -201,13 +205,16 @@ func (e *Exporter) StoreConsumerGroupOffset() error { } groupTopicPartitions = append(groupTopicPartitions, groupTopicPartition) log.Infof("groupTopicPartitions is: %v", groupTopicPartitions) - lcgor, err := e.admin.ListConsumerGroupOffsets(ctx, groupTopicPartitions, kafka.SetAdminRequireStableOffsets(true)) + lcgor, err := e.admin.ListConsumerGroupOffsets(ctx, groupTopicPartitions, kafka.SetAdminRequireStableOffsets(false)) if err != nil { return errors.Wrapf(err, "unable to list consumer groups offsets %v", groupTopicPartitions) } for _, res := range lcgor.ConsumerGroupsTopicPartitions { log.Infof("consumer group topic paritions is %v", res.String()) - e.writer.OffsetWrite(res) + err := e.writer.OffsetWrite(res) + if err != nil { + return err + } } } } diff --git a/impl/parquet_reader.go b/impl/parquet_reader.go index 8c2e5f1..76e96ef 100644 --- a/impl/parquet_reader.go +++ b/impl/parquet_reader.go @@ -2,6 +2,7 @@ package impl import ( "encoding/json" + "strconv" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -211,11 +212,11 @@ func toKafkaConsumerGroupTopicPartitions(offsetMessages []OffsetMessage) ([]kafk if len(offsetMessages) > 0 { for _, offsetMessage := range offsetMessages { var topicPartition kafka.TopicPartition - kafkaOffset := &topicPartition.Offset - err := kafkaOffset.Set(offsetMessage.Offset) + offset, err := modifyOffset(offsetMessage.Offset) if err != nil { return nil, errors.Wrapf(err, "Failed to set offset during consumer offset restore: %s", offsetMessage.Offset) } + topicPartition.Offset = offset topicPartition.Partition = offsetMessage.Partition topicPartition.Topic = &offsetMessage.Topic if val, ok := groupIDToPartitions[offsetMessage.GroupID]; !ok { @@ -240,3 +241,29 @@ func toKafkaConsumerGroupTopicPartitions(offsetMessages []OffsetMessage) ([]kafk return res, nil } + +func modifyOffset(offset string) (kafka.Offset, error) { + switch offset { + case "beginning": + fallthrough + case "earliest": + return kafka.Offset(kafka.OffsetBeginning), nil + + case "end": + fallthrough + case "latest": + return kafka.Offset(kafka.OffsetEnd), nil + + case "unset": + fallthrough + case "invalid": + return kafka.Offset(kafka.OffsetInvalid), nil + + case "stored": + return kafka.Offset(kafka.OffsetStored), nil + + default: + off, err := strconv.Atoi(offset) + return kafka.Offset(off + 1), err + } +}