Skip to content

Commit

Permalink
feat: Adding restore based on timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamcoc committed Sep 20, 2023
1 parent 3f14f2a commit 85c2ac7
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 88 deletions.
9 changes: 8 additions & 1 deletion cmd/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func CreateImportCmd() (*cobra.Command, error) {
var sslKeyLocation string
var includePartitionAndOffset bool
var clientid string
var restoreBefore string
var restoreAfter string

command := cobra.Command{
Use: "import",
Expand Down Expand Up @@ -85,7 +87,10 @@ func CreateImportCmd() (*cobra.Command, error) {
SSLKeyLocation: sslKeyLocation,
SSLCertLocation: sslCertLocation,
}
importer := impl.NewImporter(logger, producer, deliveryChan, parquetReader)
importer, err := impl.NewImporter(logger, producer, deliveryChan, parquetReader, restoreBefore, restoreAfter)
if err != nil {
panic(errors.Wrap(err, "unable to init importer"))
}
err = importer.Run(kafkaConsumerConfig)
if err != nil {
panic(errors.Wrap(err, "Error while running importer"))
Expand All @@ -105,6 +110,8 @@ func CreateImportCmd() (*cobra.Command, error) {
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "Client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "Path to ssl private key")
command.Flags().StringVar(&clientid, "client-id", "", "Producer client id")
command.Flags().StringVar(&restoreBefore, "restore-before", "", "timestamp in RFC3339 format to restore data before this time")
command.Flags().StringVar(&restoreAfter, "restore-after", "", "timestamp in RFC3339 format to restore data after this time")
command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "To store partition and offset of kafka message in file")
err := command.MarkFlagRequired("file")
if err != nil {
Expand Down
135 changes: 72 additions & 63 deletions impl/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,73 @@ type Writer interface {
const defaultMaxWaitingTimeForNewMessage = time.Duration(30) * time.Second

func (e *Exporter) Run() (exportedCount uint64, err error) {
err = e.StoreConsumerGroupOffset()
if err != nil {
return exportedCount, errors.Wrap(err, "unable to read consumer group")
}
err = e.consumer.SubscribeTopics(e.topics, nil)
if err != nil {
return
}
log.Infof("Subscribed topics: %s", e.topics)
cx := make(chan os.Signal, 1)
signal.Notify(cx, os.Interrupt, syscall.SIGTERM)
go func() {
<-cx
err = e.flushData()
if err != nil {
panic(err)
}
os.Exit(1)
}()
defer func() {
err = e.flushData()
if err != nil {
panic(err)
}
}()
maxWaitingTimeForNewMessage := defaultMaxWaitingTimeForNewMessage
if e.options.MaxWaitingTimeForNewMessage != nil {
maxWaitingTimeForNewMessage = *e.options.MaxWaitingTimeForNewMessage
}
for {
msg, err := e.consumer.ReadMessage(maxWaitingTimeForNewMessage)
if err != nil {
if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrTimedOut {
log.Infof("Waited for %s but no messages any more! Finish!", maxWaitingTimeForNewMessage)
}
return exportedCount, err
}
err = e.writer.Write(*msg)
if err != nil {
return exportedCount, err
}
exportedCount++
log.Infof("Exported message: %v (Total: %d)", msg.TopicPartition, exportedCount)
if e.options != nil && exportedCount == e.options.Limit {
log.Infof("Reached limit %d - Finish!", e.options.Limit)
return exportedCount, err
}
}
}

func (e *Exporter) flushData() error {
err := e.writer.Flush()
if err != nil {
return errors.Wrap(err, "Failed to flush writer")
}
_, err = e.consumer.Commit()
if err != nil {
if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrNoOffset {
log.Warnf("No offset, it can happen when there is no message to read, error is: %v", err)
} else {
return errors.Wrap(err, "Failed to commit messages")
}
}
return nil
}

func (e *Exporter) StoreConsumerGroupOffset() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// state, err := kafka.ConsumerGroupStateFromString("stable")
Expand All @@ -57,7 +124,7 @@ func (e *Exporter) Run() (exportedCount uint64, err error) {

listRes, err := e.admin.ListConsumerGroups(ctx, kafka.SetAdminRequestTimeout(5*time.Second))
if err != nil {
return exportedCount, errors.Wrap(err, "unable to list consumer groups")
return errors.Wrap(err, "unable to list consumer groups")
}

groupIds := make([]string, 0)
Expand All @@ -71,7 +138,7 @@ func (e *Exporter) Run() (exportedCount uint64, err error) {
if len(groupIds) > 0 {
groupRes, err := e.admin.DescribeConsumerGroups(ctx, groupIds, kafka.SetAdminRequestTimeout(5*time.Second))
if err != nil {
return exportedCount, errors.Wrapf(err, "unable to describe consumer groups %v", groupIds)
return errors.Wrapf(err, "unable to describe consumer groups %v", groupIds)
}
log.Infof("group result is: %v", groupRes)

Expand Down Expand Up @@ -102,7 +169,7 @@ func (e *Exporter) Run() (exportedCount uint64, err error) {
metadata, err := e.consumer.GetMetadata(&topic, false, 5000)
if err != nil {
log.Errorf("error in getting metadata of topic: %w", err)
return exportedCount, errors.Wrapf(err, "unable to get metadata of a topic: %s", topic)
return errors.Wrapf(err, "unable to get metadata of a topic: %s", topic)
}
log.Debugf("metadata is: %v", metadata)
topicMetadata := metadata.Topics[topic]
Expand Down Expand Up @@ -132,74 +199,16 @@ func (e *Exporter) Run() (exportedCount uint64, err error) {
}
groupTopicPartitions = append(groupTopicPartitions, groupTopicPartition)
log.Infof("groupTopicPartitions is: %v", groupTopicPartitions)
lcgor, err := e.admin.ListConsumerGroupOffsets(ctx, groupTopicPartitions, kafka.SetAdminRequireStableOffsets(false))
lcgor, err := e.admin.ListConsumerGroupOffsets(ctx, groupTopicPartitions, kafka.SetAdminRequireStableOffsets(true))
if err != nil {
return exportedCount, errors.Wrapf(err, "unable to list consumer groups offsets %v", groupTopicPartitions)
return errors.Wrapf(err, "unable to list consumer groups offsets %v", groupTopicPartitions)
}
for _, res := range lcgor.ConsumerGroupsTopicPartitions {
log.Infof("consumer group topic paritions is %v", res.String())
e.writer.OffsetWrite(res)
}
}
}
err = e.consumer.SubscribeTopics(e.topics, nil)
if err != nil {
return
}
log.Infof("Subscribed topics: %s", e.topics)
cx := make(chan os.Signal, 1)
signal.Notify(cx, os.Interrupt, syscall.SIGTERM)
go func() {
<-cx
err = e.flushData()
if err != nil {
panic(err)
}
os.Exit(1)
}()
defer func() {
err = e.flushData()
if err != nil {
panic(err)
}
}()
maxWaitingTimeForNewMessage := defaultMaxWaitingTimeForNewMessage
if e.options.MaxWaitingTimeForNewMessage != nil {
maxWaitingTimeForNewMessage = *e.options.MaxWaitingTimeForNewMessage
}
for {
msg, err := e.consumer.ReadMessage(maxWaitingTimeForNewMessage)
if err != nil {
if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrTimedOut {
log.Infof("Waited for %s but no messages any more! Finish!", maxWaitingTimeForNewMessage)
}
return exportedCount, err
}
err = e.writer.Write(*msg)
if err != nil {
return exportedCount, err
}
exportedCount++
log.Infof("Exported message: %v (Total: %d)", msg.TopicPartition, exportedCount)
if e.options != nil && exportedCount == e.options.Limit {
log.Infof("Reached limit %d - Finish!", e.options.Limit)
return exportedCount, err
}
}
}

func (e *Exporter) flushData() error {
err := e.writer.Flush()
if err != nil {
return errors.Wrap(err, "Failed to flush writer")
}
_, err = e.consumer.Commit()
if err != nil {
if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrNoOffset {
log.Warnf("No offset, it can happen when there is no message to read, error is: %v", err)
} else {
return errors.Wrap(err, "Failed to commit messages")
}
}
return nil
}
47 changes: 35 additions & 12 deletions impl/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
Expand All @@ -12,23 +13,44 @@ import (
)

type Importer struct {
logger log.Logger
producer *kafka.Producer
reader Reader
deliveryChan chan kafka.Event
logger log.Logger
producer *kafka.Producer
reader Reader
deliveryChan chan kafka.Event
restoreBefore time.Time
restoreAfter time.Time
}

func NewImporter(log log.Logger, producer *kafka.Producer, deliveryChan chan kafka.Event, reader Reader) *Importer {
return &Importer{
logger: log,
producer: producer,
reader: reader,
deliveryChan: deliveryChan,
func NewImporter(log log.Logger, producer *kafka.Producer, deliveryChan chan kafka.Event, reader Reader, restoreBefore, restoreAfter string) (*Importer, error) {
var restoreAfterTimeUTC, restoreBeforeTimeUTC time.Time

if restoreAfter != "" {
restoreAfterTime, err := time.Parse(time.RFC3339, restoreAfter)
if err != nil {
return nil, errors.Wrap(err, "Failed to init importer")
}
restoreAfterTimeUTC = restoreAfterTime.UTC()
}

if restoreBefore != "" {
restoreBeforeTime, err := time.Parse(time.RFC3339, restoreBefore)
if err != nil {
return nil, errors.Wrap(err, "Failed to init importer")
}
restoreBeforeTimeUTC = restoreBeforeTime.UTC()
}
return &Importer{
logger: log,
producer: producer,
reader: reader,
deliveryChan: deliveryChan,
restoreBefore: restoreBeforeTimeUTC,
restoreAfter: restoreAfterTimeUTC,
}, nil
}

type Reader interface {
ReadMessage() chan kafka.Message
ReadMessage(restoreBefore, restoreAfter time.Time) chan kafka.Message
ReadOffset() chan kafka.ConsumerGroupTopicPartitions
}

Expand All @@ -44,7 +66,7 @@ func (i *Importer) Run(cfg kafka_utils.Config) error {
i.producer.Flush(30 * 1000)
}()
offsetChan := i.reader.ReadOffset()
messageChn := i.reader.ReadMessage()
messageChn := i.reader.ReadMessage(i.restoreBefore, i.restoreAfter)

for {
select {
Expand All @@ -71,6 +93,7 @@ func (i *Importer) Run(cfg kafka_utils.Config) error {
panic(errors.Wrap(err, "Unable to restore offsets of consumer"))
}
i.logger.Infof("final result of commit offsets is: %v", res)
consumer.Close()
}
}
}
37 changes: 25 additions & 12 deletions impl/parquet_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package impl

import (
"encoding/json"
"strconv"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand Down Expand Up @@ -52,7 +51,7 @@ func NewParquetReader(filePathMessage, filePathOffset string, includePartitionAn

const batchSize = 10

func (p *ParquetReader) ReadMessage() chan kafka.Message {
func (p *ParquetReader) ReadMessage(restoreBefore, restoreAfter time.Time) chan kafka.Message {
rowNum := int(p.parquetReaderMessage.GetNumRows())
ch := make(chan kafka.Message, batchSize)
counter := 0
Expand All @@ -66,13 +65,15 @@ func (p *ParquetReader) ReadMessage() chan kafka.Message {

for _, parquetMessage := range kafkaMessages {
counter++
message, err := toKafkaMessage(parquetMessage, p.includePartitionAndOffset)
message, err := toKafkaMessage(parquetMessage, p.includePartitionAndOffset, restoreBefore, restoreAfter)
if err != nil {
err = errors.Wrapf(err, "Failed to parse kafka message from parquet message")
panic(err)
}
ch <- *message
log.Infof("Loaded %f% (%d/%d)", counter/rowNum, counter, rowNum)
if message != nil {
ch <- *message
log.Infof("Loaded %f% (%d/%d)", counter/rowNum, counter, rowNum)
}
}
}
p.parquetReaderMessage.ReadStop()
Expand Down Expand Up @@ -128,12 +129,24 @@ func (p *ParquetReader) GetNumberOfRowsInOffsetFile() int64 {
return p.parquetReaderOffset.GetNumRows()
}

func toKafkaMessage(message KafkaMessage, includePartitionAndOffset bool) (*kafka.Message, error) {
func toKafkaMessage(message KafkaMessage, includePartitionAndOffset bool, restoreBefore, restoreAfter time.Time) (*kafka.Message, error) {
timestamp, err := time.Parse(time.RFC3339, message.Timestamp)
if err != nil {
return nil, errors.Wrapf(err, "Failed to convert string to time.Time: %s", message.Timestamp)
}

if !restoreBefore.IsZero() {
if !timestamp.Before(restoreBefore) {
return nil, nil
}
}

if !restoreAfter.IsZero() {
if !timestamp.After(restoreAfter) {
return nil, nil
}
}

var headers []kafka.Header
if len(message.Headers) > 0 {
err := json.Unmarshal([]byte(message.Headers), &headers)
Expand Down Expand Up @@ -166,11 +179,11 @@ func toKafkaMessage(message KafkaMessage, includePartitionAndOffset bool) (*kafk
}

if includePartitionAndOffset {
offset, err := strconv.Atoi(message.Offset)
kafkaOffset := &kafkaMessage.TopicPartition.Offset
err = kafkaOffset.Set(message.Offset)
if err != nil {
return nil, errors.Wrapf(err, "Failed to convert string to int for message offset: %s", message.Offset)
return nil, errors.Wrapf(err, "Failed to set offset for message offset: %s", message.Offset)
}
kafkaMessage.TopicPartition.Offset = kafka.Offset(offset)
kafkaMessage.TopicPartition.Partition = message.Partition
}

Expand All @@ -183,11 +196,11 @@ func toKafkaConsumerGroupTopicPartitions(offsetMessages []OffsetMessage) ([]kafk
if len(offsetMessages) > 0 {
for _, offsetMessage := range offsetMessages {
var topicPartition kafka.TopicPartition
offset, err := strconv.Atoi(offsetMessage.Offset)
kafkaOffset := &topicPartition.Offset
err := kafkaOffset.Set(offsetMessage.Offset)
if err != nil {
return res, errors.Wrapf(err, "Failed to convert string to int for message offset: %s", offsetMessage.Offset)
return nil, errors.Wrapf(err, "Failed to set offset during consumer offset restore: %s", offsetMessage.Offset)
}
topicPartition.Offset = kafka.Offset(offset)
topicPartition.Partition = offsetMessage.Partition
topicPartition.Topic = &offsetMessage.Topic
if val, ok := groupIDToPartitions[offsetMessage.GroupID]; !ok {
Expand Down

0 comments on commit 85c2ac7

Please sign in to comment.