Skip to content

Commit

Permalink
feat: Adding parquet file for capturing offset and restore consumer g…
Browse files Browse the repository at this point in the history
…roup offsets
  • Loading branch information
shubhamcoc committed Sep 18, 2023
1 parent 6d68949 commit 3f14f2a
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 59 deletions.
31 changes: 25 additions & 6 deletions cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
)

func CreateExportCommand() (*cobra.Command, error) {
var filePath string
var messageFilePath string
var offsetFilePath string
var kafkaServers string
var kafkaUsername string
var kafkaPassword string
Expand Down Expand Up @@ -79,12 +80,12 @@ func CreateExportCommand() (*cobra.Command, error) {
go func(workerID int) {
defer wg.Done()
for {
outputFilePath := filePath
outputFilePath := messageFilePath
if exportLimitPerFile > 0 {
outputFilePath = fmt.Sprintf("%s.%d", filePath, time.Now().UnixMilli())
outputFilePath = fmt.Sprintf("%s.%d", messageFilePath, time.Now().UnixMilli())
}
log.Infof("[Worker-%d] Exporting to: %s", workerID, outputFilePath)
fileWriter, err := createParquetFileWriter(
messageFileWriter, err := createParquetFileWriter(
Storage(storageType),
outputFilePath,
gcs_utils.Config{
Expand All @@ -96,7 +97,20 @@ func CreateExportCommand() (*cobra.Command, error) {
if err != nil {
panic(errors.Wrap(err, "[NewLocalFileWriter]"))
}
parquetWriter, err := impl.NewParquetWriter(*fileWriter)
offsetsFilePath := offsetFilePath
offsetFileWriter, err := createParquetFileWriter(
Storage(storageType),
offsetsFilePath,
gcs_utils.Config{
ProjectId: gcsProjectID,
BucketName: gcsBucketName,
CredentialsFile: googleCredentialsFile,
},
)
if err != nil {
panic(errors.Wrap(err, "[NewLocalFileWriter]"))
}
parquetWriter, err := impl.NewParquetWriter(*messageFileWriter, *offsetFileWriter)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file writer"))
}
Expand All @@ -121,7 +135,8 @@ func CreateExportCommand() (*cobra.Command, error) {
},
}
command.Flags().StringVar(&storageType, "storage", "file", "Storage type: local file (file) or Google cloud storage (gcs)")
command.Flags().StringVarP(&filePath, "file", "f", "", "Output file path (required)")
command.Flags().StringVarP(&messageFilePath, "file", "f", "", "Output file path for storing message (required)")
command.Flags().StringVarP(&offsetFilePath, "offset-file", "o", "", "Output file path for storing offset (required)")
command.Flags().StringVar(&googleCredentialsFile, "google-credentials", "", "Path to Google Credentials file")
command.Flags().StringVar(&gcsBucketName, "gcs-bucket", "", "Google Cloud Storage bucket name")
command.Flags().StringVar(&gcsProjectID, "gcs-project-id", "", "Google Cloud Storage Project ID")
Expand All @@ -148,6 +163,10 @@ func CreateExportCommand() (*cobra.Command, error) {
if err != nil {
return nil, err
}
err = command.MarkFlagRequired("offset-file")
if err != nil {
return nil, err
}
return &command, nil
}

Expand Down
39 changes: 31 additions & 8 deletions cmd/importer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand All @@ -12,7 +13,8 @@ import (
)

func CreateImportCmd() (*cobra.Command, error) {
var filePath string
var messageFilePath string
var offsetFilePath string
var kafkaServers string
var kafkaUsername string
var kafkaPassword string
Expand All @@ -28,8 +30,9 @@ func CreateImportCmd() (*cobra.Command, error) {
command := cobra.Command{
Use: "import",
Run: func(cmd *cobra.Command, args []string) {
log.Infof("Input file: %s", filePath)
parquetReader, err := impl.NewParquetReader(filePath, includePartitionAndOffset)
logger := log.WithContext(context.Background())
logger.Infof("Input file: %s", messageFilePath)
parquetReader, err := impl.NewParquetReader(messageFilePath, offsetFilePath, includePartitionAndOffset)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file reader"))
}
Expand All @@ -48,7 +51,7 @@ func CreateImportCmd() (*cobra.Command, error) {
SSLKeyLocation: sslKeyLocation,
SSLCertLocation: sslCertLocation,
SSLKeyPassword: sslKeyPassword,
EnableAutoOffsetStore: false,
EnableAutoOffsetStore: true,
ClientID: clientid,
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
Expand All @@ -66,19 +69,31 @@ func CreateImportCmd() (*cobra.Command, error) {
if m.TopicPartition.Error != nil {
panic(fmt.Sprintf("Failed to deliver message: %v\n", m.TopicPartition))
} else {
log.Debugf("Successfully produced record to topic %s partition [%d] @ offset %v\n",
logger.Debugf("Successfully produced record to topic %s partition [%d] @ offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
}
}()
importer := impl.NewImporter(producer, deliveryChan, parquetReader)
err = importer.Run()
kafkaConsumerConfig := kafka_utils.Config{
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
SSLCALocation: sslCaLocation,
SSLKeyPassword: sslKeyPassword,
SSLKeyLocation: sslKeyLocation,
SSLCertLocation: sslCertLocation,
}
importer := impl.NewImporter(logger, producer, deliveryChan, parquetReader)
err = importer.Run(kafkaConsumerConfig)
if err != nil {
panic(errors.Wrap(err, "Error while running importer"))
}
},
}
command.Flags().StringVarP(&filePath, "file", "f", "", "Output file path (required)")
command.Flags().StringVarP(&messageFilePath, "file", "f", "", "Output file path for storing message (required)")
command.Flags().StringVarP(&offsetFilePath, "offset-file", "o", "", "Output file path for storing offset (required)")
command.Flags().StringVar(&kafkaServers, "kafka-servers", "", "Kafka servers string")
command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username")
command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password")
Expand All @@ -91,5 +106,13 @@ func CreateImportCmd() (*cobra.Command, error) {
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "Path to ssl private key")
command.Flags().StringVar(&clientid, "client-id", "", "Producer client id")
command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "To store partition and offset of kafka message in file")
err := command.MarkFlagRequired("file")
if err != nil {
return nil, err
}
err = command.MarkFlagRequired("offset-file")
if err != nil {
return nil, err
}
return &command, nil
}
15 changes: 11 additions & 4 deletions cmd/parquet_row_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,29 @@ import (
)

func CreateCountParquetRowCommand() (*cobra.Command, error) {
var filePath string
var filePathMessage string
var filePathOffset string

command := cobra.Command{
Use: "count-parquet-rows",
Run: func(cmd *cobra.Command, args []string) {
parquetReader, err := impl.NewParquetReader(filePath, false)
parquetReader, err := impl.NewParquetReader(filePathMessage, filePathOffset, false)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file reader"))
}
log.Infof("Number of rows: %d", parquetReader.GetNumberOfRows())
log.Infof("Number of rows in message file: %d", parquetReader.GetNumberOfRowsInMessageFile())
log.Infof("Number of rows in offset file: %d", parquetReader.GetNumberOfRowsInOffsetFile())
},
}
command.Flags().StringVarP(&filePath, "file", "f", "", "File path (required)")
command.Flags().StringVarP(&filePathMessage, "file", "f", "", "File path of stored kafka message (required)")
command.Flags().StringVarP(&filePathOffset, "offset-file", "o", "", "File path of stored kafka offset (required)")
err := command.MarkFlagRequired("file")
if err != nil {
return nil, err
}
err = command.MarkFlagRequired("offset-file")
if err != nil {
return nil, err
}
return &command, nil
}
6 changes: 4 additions & 2 deletions impl/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Options struct {

type Writer interface {
Write(msg kafka.Message) error
OffsetWrite(msg kafka.ConsumerGroupTopicPartitions) error
Flush() error
}

Expand Down Expand Up @@ -122,7 +123,7 @@ func (e *Exporter) Run() (exportedCount uint64, err error) {

for _, topic := range e.topics {
groupList := topicTogroupNameList[topic]
for k, _ := range groupList {
for k := range groupList {
groupTopicPartitions := make([]kafka.ConsumerGroupTopicPartitions, 0)
kafkaTopicPartitions := topicToPartitionList[topic]
groupTopicPartition := kafka.ConsumerGroupTopicPartitions{
Expand All @@ -136,7 +137,8 @@ func (e *Exporter) Run() (exportedCount uint64, err error) {
return exportedCount, errors.Wrapf(err, "unable to list consumer groups offsets %v", groupTopicPartitions)
}
for _, res := range lcgor.ConsumerGroupsTopicPartitions {
log.Infof("consumer group topic paritions is %v", res)
log.Infof("consumer group topic paritions is %v", res.String())
e.writer.OffsetWrite(res)
}
}
}
Expand Down
44 changes: 35 additions & 9 deletions impl/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,33 @@ import (
"syscall"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
)

type Importer struct {
logger log.Logger
producer *kafka.Producer
reader Reader
deliveryChan chan kafka.Event
}

func NewImporter(producer *kafka.Producer, deliveryChan chan kafka.Event, reader Reader) *Importer {
func NewImporter(log log.Logger, producer *kafka.Producer, deliveryChan chan kafka.Event, reader Reader) *Importer {
return &Importer{
logger: log,
producer: producer,
reader: reader,
deliveryChan: deliveryChan,
}
}

type Reader interface {
Read() chan kafka.Message
ReadMessage() chan kafka.Message
ReadOffset() chan kafka.ConsumerGroupTopicPartitions
}

func (i *Importer) Run() error {
func (i *Importer) Run(cfg kafka_utils.Config) error {
cx := make(chan os.Signal, 1)
signal.Notify(cx, os.Interrupt, syscall.SIGTERM)
go func() {
Expand All @@ -38,13 +43,34 @@ func (i *Importer) Run() error {
defer func() {
i.producer.Flush(30 * 1000)
}()
messageChn := i.reader.Read()
offsetChan := i.reader.ReadOffset()
messageChn := i.reader.ReadMessage()

for message := range messageChn {
err := i.producer.Produce(&message, i.deliveryChan)
if err != nil {
return errors.Wrapf(err, "Failed to produce message: %s", string(message.Value))
for {
select {
case message, ok := <-messageChn:
if !ok {
break
}
err := i.producer.Produce(&message, i.deliveryChan)
if err != nil {
return errors.Wrapf(err, "Failed to produce message: %s", string(message.Value))
}

case offsetMessage, ok := <-offsetChan:
if !ok {
continue
}
cfg.GroupId = offsetMessage.Group
consumer, err := kafka_utils.NewConsumer(cfg)
if err != nil {
panic(errors.Wrap(err, "Unable to init consumer"))
}
res, err := consumer.CommitOffsets(offsetMessage.Partitions)
if err != nil {
panic(errors.Wrap(err, "Unable to restore offsets of consumer"))
}
i.logger.Infof("final result of commit offsets is: %v", res)
}
}
return nil
}
Loading

0 comments on commit 3f14f2a

Please sign in to comment.