diff --git a/cmd/exporter.go b/cmd/exporter.go index 218339d..6bb6859 100644 --- a/cmd/exporter.go +++ b/cmd/exporter.go @@ -18,7 +18,8 @@ import ( ) func CreateExportCommand() (*cobra.Command, error) { - var filePath string + var messageFilePath string + var offsetFilePath string var kafkaServers string var kafkaUsername string var kafkaPassword string @@ -79,12 +80,12 @@ func CreateExportCommand() (*cobra.Command, error) { go func(workerID int) { defer wg.Done() for { - outputFilePath := filePath + outputFilePath := messageFilePath if exportLimitPerFile > 0 { - outputFilePath = fmt.Sprintf("%s.%d", filePath, time.Now().UnixMilli()) + outputFilePath = fmt.Sprintf("%s.%d", messageFilePath, time.Now().UnixMilli()) } log.Infof("[Worker-%d] Exporting to: %s", workerID, outputFilePath) - fileWriter, err := createParquetFileWriter( + messageFileWriter, err := createParquetFileWriter( Storage(storageType), outputFilePath, gcs_utils.Config{ @@ -96,7 +97,20 @@ func CreateExportCommand() (*cobra.Command, error) { if err != nil { panic(errors.Wrap(err, "[NewLocalFileWriter]")) } - parquetWriter, err := impl.NewParquetWriter(*fileWriter) + offsetsFilePath := offsetFilePath + offsetFileWriter, err := createParquetFileWriter( + Storage(storageType), + offsetsFilePath, + gcs_utils.Config{ + ProjectId: gcsProjectID, + BucketName: gcsBucketName, + CredentialsFile: googleCredentialsFile, + }, + ) + if err != nil { + panic(errors.Wrap(err, "[NewLocalFileWriter]")) + } + parquetWriter, err := impl.NewParquetWriter(*messageFileWriter, *offsetFileWriter) if err != nil { panic(errors.Wrap(err, "Unable to init parquet file writer")) } @@ -121,7 +135,8 @@ func CreateExportCommand() (*cobra.Command, error) { }, } command.Flags().StringVar(&storageType, "storage", "file", "Storage type: local file (file) or Google cloud storage (gcs)") - command.Flags().StringVarP(&filePath, "file", "f", "", "Output file path (required)") + command.Flags().StringVarP(&messageFilePath, "file", "f", "", "Output file path for storing message (required)") + command.Flags().StringVarP(&offsetFilePath, "offset-file", "o", "", "Output file path for storing offset (required)") command.Flags().StringVar(&googleCredentialsFile, "google-credentials", "", "Path to Google Credentials file") command.Flags().StringVar(&gcsBucketName, "gcs-bucket", "", "Google Cloud Storage bucket name") command.Flags().StringVar(&gcsProjectID, "gcs-project-id", "", "Google Cloud Storage Project ID") @@ -148,6 +163,10 @@ func CreateExportCommand() (*cobra.Command, error) { if err != nil { return nil, err } + err = command.MarkFlagRequired("offset-file") + if err != nil { + return nil, err + } return &command, nil } diff --git a/cmd/importer.go b/cmd/importer.go index 5c0b6a2..49eaf47 100644 --- a/cmd/importer.go +++ b/cmd/importer.go @@ -1,6 +1,7 @@ package cmd import ( + "context" "fmt" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -12,7 +13,8 @@ import ( ) func CreateImportCmd() (*cobra.Command, error) { - var filePath string + var messageFilePath string + var offsetFilePath string var kafkaServers string var kafkaUsername string var kafkaPassword string @@ -28,8 +30,9 @@ func CreateImportCmd() (*cobra.Command, error) { command := cobra.Command{ Use: "import", Run: func(cmd *cobra.Command, args []string) { - log.Infof("Input file: %s", filePath) - parquetReader, err := impl.NewParquetReader(filePath, includePartitionAndOffset) + logger := log.WithContext(context.Background()) + logger.Infof("Input file: %s", messageFilePath) + parquetReader, err := impl.NewParquetReader(messageFilePath, offsetFilePath, includePartitionAndOffset) if err != nil { panic(errors.Wrap(err, "Unable to init parquet file reader")) } @@ -48,7 +51,7 @@ func CreateImportCmd() (*cobra.Command, error) { SSLKeyLocation: sslKeyLocation, SSLCertLocation: sslCertLocation, SSLKeyPassword: sslKeyPassword, - EnableAutoOffsetStore: false, + EnableAutoOffsetStore: true, ClientID: clientid, } producer, err := kafka_utils.NewProducer(kafkaProducerConfig) @@ -66,19 +69,31 @@ func CreateImportCmd() (*cobra.Command, error) { if m.TopicPartition.Error != nil { panic(fmt.Sprintf("Failed to deliver message: %v\n", m.TopicPartition)) } else { - log.Debugf("Successfully produced record to topic %s partition [%d] @ offset %v\n", + logger.Debugf("Successfully produced record to topic %s partition [%d] @ offset %v\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) } } }() - importer := impl.NewImporter(producer, deliveryChan, parquetReader) - err = importer.Run() + kafkaConsumerConfig := kafka_utils.Config{ + BootstrapServers: kafkaServers, + SecurityProtocol: kafkaSecurityProtocol, + SASLMechanism: kafkaSASKMechanism, + SASLUsername: kafkaUsername, + SASLPassword: kafkaPassword, + SSLCALocation: sslCaLocation, + SSLKeyPassword: sslKeyPassword, + SSLKeyLocation: sslKeyLocation, + SSLCertLocation: sslCertLocation, + } + importer := impl.NewImporter(logger, producer, deliveryChan, parquetReader) + err = importer.Run(kafkaConsumerConfig) if err != nil { panic(errors.Wrap(err, "Error while running importer")) } }, } - command.Flags().StringVarP(&filePath, "file", "f", "", "Output file path (required)") + command.Flags().StringVarP(&messageFilePath, "file", "f", "", "Output file path for storing message (required)") + command.Flags().StringVarP(&offsetFilePath, "offset-file", "o", "", "Output file path for storing offset (required)") command.Flags().StringVar(&kafkaServers, "kafka-servers", "", "Kafka servers string") command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username") command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password") @@ -91,5 +106,13 @@ func CreateImportCmd() (*cobra.Command, error) { command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "Path to ssl private key") command.Flags().StringVar(&clientid, "client-id", "", "Producer client id") command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "To store partition and offset of kafka message in file") + err := command.MarkFlagRequired("file") + if err != nil { + return nil, err + } + err = command.MarkFlagRequired("offset-file") + if err != nil { + return nil, err + } return &command, nil } diff --git a/cmd/parquet_row_counter.go b/cmd/parquet_row_counter.go index 8fdde14..f16609e 100644 --- a/cmd/parquet_row_counter.go +++ b/cmd/parquet_row_counter.go @@ -8,22 +8,29 @@ import ( ) func CreateCountParquetRowCommand() (*cobra.Command, error) { - var filePath string + var filePathMessage string + var filePathOffset string command := cobra.Command{ Use: "count-parquet-rows", Run: func(cmd *cobra.Command, args []string) { - parquetReader, err := impl.NewParquetReader(filePath, false) + parquetReader, err := impl.NewParquetReader(filePathMessage, filePathOffset, false) if err != nil { panic(errors.Wrap(err, "Unable to init parquet file reader")) } - log.Infof("Number of rows: %d", parquetReader.GetNumberOfRows()) + log.Infof("Number of rows in message file: %d", parquetReader.GetNumberOfRowsInMessageFile()) + log.Infof("Number of rows in offset file: %d", parquetReader.GetNumberOfRowsInOffsetFile()) }, } - command.Flags().StringVarP(&filePath, "file", "f", "", "File path (required)") + command.Flags().StringVarP(&filePathMessage, "file", "f", "", "File path of stored kafka message (required)") + command.Flags().StringVarP(&filePathOffset, "offset-file", "o", "", "File path of stored kafka offset (required)") err := command.MarkFlagRequired("file") if err != nil { return nil, err } + err = command.MarkFlagRequired("offset-file") + if err != nil { + return nil, err + } return &command, nil } diff --git a/impl/exporter.go b/impl/exporter.go index f3b0f9d..e876807 100644 --- a/impl/exporter.go +++ b/impl/exporter.go @@ -37,6 +37,7 @@ type Options struct { type Writer interface { Write(msg kafka.Message) error + OffsetWrite(msg kafka.ConsumerGroupTopicPartitions) error Flush() error } @@ -122,7 +123,7 @@ func (e *Exporter) Run() (exportedCount uint64, err error) { for _, topic := range e.topics { groupList := topicTogroupNameList[topic] - for k, _ := range groupList { + for k := range groupList { groupTopicPartitions := make([]kafka.ConsumerGroupTopicPartitions, 0) kafkaTopicPartitions := topicToPartitionList[topic] groupTopicPartition := kafka.ConsumerGroupTopicPartitions{ @@ -136,7 +137,8 @@ func (e *Exporter) Run() (exportedCount uint64, err error) { return exportedCount, errors.Wrapf(err, "unable to list consumer groups offsets %v", groupTopicPartitions) } for _, res := range lcgor.ConsumerGroupsTopicPartitions { - log.Infof("consumer group topic paritions is %v", res) + log.Infof("consumer group topic paritions is %v", res.String()) + e.writer.OffsetWrite(res) } } } diff --git a/impl/importer.go b/impl/importer.go index 038c36d..c5f944f 100644 --- a/impl/importer.go +++ b/impl/importer.go @@ -6,17 +6,21 @@ import ( "syscall" "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/huantt/kafka-dump/pkg/kafka_utils" + "github.com/huantt/kafka-dump/pkg/log" "github.com/pkg/errors" ) type Importer struct { + logger log.Logger producer *kafka.Producer reader Reader deliveryChan chan kafka.Event } -func NewImporter(producer *kafka.Producer, deliveryChan chan kafka.Event, reader Reader) *Importer { +func NewImporter(log log.Logger, producer *kafka.Producer, deliveryChan chan kafka.Event, reader Reader) *Importer { return &Importer{ + logger: log, producer: producer, reader: reader, deliveryChan: deliveryChan, @@ -24,10 +28,11 @@ func NewImporter(producer *kafka.Producer, deliveryChan chan kafka.Event, reader } type Reader interface { - Read() chan kafka.Message + ReadMessage() chan kafka.Message + ReadOffset() chan kafka.ConsumerGroupTopicPartitions } -func (i *Importer) Run() error { +func (i *Importer) Run(cfg kafka_utils.Config) error { cx := make(chan os.Signal, 1) signal.Notify(cx, os.Interrupt, syscall.SIGTERM) go func() { @@ -38,13 +43,34 @@ func (i *Importer) Run() error { defer func() { i.producer.Flush(30 * 1000) }() - messageChn := i.reader.Read() + offsetChan := i.reader.ReadOffset() + messageChn := i.reader.ReadMessage() - for message := range messageChn { - err := i.producer.Produce(&message, i.deliveryChan) - if err != nil { - return errors.Wrapf(err, "Failed to produce message: %s", string(message.Value)) + for { + select { + case message, ok := <-messageChn: + if !ok { + break + } + err := i.producer.Produce(&message, i.deliveryChan) + if err != nil { + return errors.Wrapf(err, "Failed to produce message: %s", string(message.Value)) + } + + case offsetMessage, ok := <-offsetChan: + if !ok { + continue + } + cfg.GroupId = offsetMessage.Group + consumer, err := kafka_utils.NewConsumer(cfg) + if err != nil { + panic(errors.Wrap(err, "Unable to init consumer")) + } + res, err := consumer.CommitOffsets(offsetMessage.Partitions) + if err != nil { + panic(errors.Wrap(err, "Unable to restore offsets of consumer")) + } + i.logger.Infof("final result of commit offsets is: %v", res) } } - return nil } diff --git a/impl/parquet_reader.go b/impl/parquet_reader.go index f387184..df232b5 100644 --- a/impl/parquet_reader.go +++ b/impl/parquet_reader.go @@ -14,43 +14,57 @@ import ( ) type ParquetReader struct { - parquetReader *reader.ParquetReader - fileReader source.ParquetFile + parquetReaderMessage *reader.ParquetReader + parquetReaderOffset *reader.ParquetReader + fileReaderMessage source.ParquetFile + fileReaderOffset source.ParquetFile includePartitionAndOffset bool } -func NewParquetReader(filePath string, includePartitionAndOffset bool) (*ParquetReader, error) { - fr, err := local.NewLocalFileReader(filePath) +func NewParquetReader(filePathMessage, filePathOffset string, includePartitionAndOffset bool) (*ParquetReader, error) { + fileReaderMessage, err := local.NewLocalFileReader(filePathMessage) if err != nil { return nil, errors.Wrap(err, "Failed to init file reader") } - parquetReader, err := reader.NewParquetReader(fr, new(ParquetMessage), 4) + parquetReaderMessage, err := reader.NewParquetReader(fileReaderMessage, new(KafkaMessage), 9) + if err != nil { + return nil, errors.Wrap(err, "Failed to init parquet reader") + } + + fileReaderOffset, err := local.NewLocalFileReader(filePathOffset) + if err != nil { + return nil, errors.Wrap(err, "Failed to init file reader") + } + + parquetReaderOffset, err := reader.NewParquetReader(fileReaderOffset, new(OffsetMessage), 4) if err != nil { return nil, errors.Wrap(err, "Failed to init parquet reader") } return &ParquetReader{ - fileReader: fr, - parquetReader: parquetReader, + fileReaderMessage: fileReaderMessage, + fileReaderOffset: fileReaderOffset, + parquetReaderMessage: parquetReaderMessage, + parquetReaderOffset: parquetReaderOffset, includePartitionAndOffset: includePartitionAndOffset, }, nil } const batchSize = 10 -func (p *ParquetReader) Read() chan kafka.Message { - rowNum := int(p.parquetReader.GetNumRows()) +func (p *ParquetReader) ReadMessage() chan kafka.Message { + rowNum := int(p.parquetReaderMessage.GetNumRows()) ch := make(chan kafka.Message, batchSize) counter := 0 go func() { for i := 0; i < rowNum/batchSize+1; i++ { - parquetMessages := make([]ParquetMessage, batchSize) - if err := p.parquetReader.Read(&parquetMessages); err != nil { + kafkaMessages := make([]KafkaMessage, batchSize) + if err := p.parquetReaderMessage.Read(&kafkaMessages); err != nil { err = errors.Wrap(err, "Failed to bulk read messages from parquet file") panic(err) } - for _, parquetMessage := range parquetMessages { + for _, parquetMessage := range kafkaMessages { counter++ message, err := toKafkaMessage(parquetMessage, p.includePartitionAndOffset) if err != nil { @@ -61,20 +75,60 @@ func (p *ParquetReader) Read() chan kafka.Message { log.Infof("Loaded %f% (%d/%d)", counter/rowNum, counter, rowNum) } } - p.parquetReader.ReadStop() - err := p.fileReader.Close() + p.parquetReaderMessage.ReadStop() + err := p.fileReaderMessage.Close() + if err != nil { + panic(errors.Wrap(err, "Failed to close fileReader")) + } + close(ch) + }() + return ch +} + +func (p *ParquetReader) ReadOffset() chan kafka.ConsumerGroupTopicPartitions { + rowNum := int(p.parquetReaderOffset.GetNumRows()) + ch := make(chan kafka.ConsumerGroupTopicPartitions, batchSize) + counter := 0 + go func() { + for i := 0; i < rowNum/batchSize+1; i++ { + offsetMessages := make([]OffsetMessage, batchSize) + if err := p.parquetReaderOffset.Read(&offsetMessages); err != nil { + err = errors.Wrap(err, "Failed to bulk read messages from parquet file") + panic(err) + } + + resMessages, err := toKafkaConsumerGroupTopicPartitions(offsetMessages) + if err != nil { + err = errors.Wrapf(err, "Failed to parse offset messages from offset file") + panic(err) + } + + for _, offsetMessage := range resMessages { + counter++ + log.Infof("offset message is: %v", offsetMessage) + ch <- offsetMessage + log.Infof("Loaded %f% (%d/%d)", counter/rowNum, counter, rowNum) + } + } + p.parquetReaderOffset.ReadStop() + err := p.fileReaderOffset.Close() if err != nil { panic(errors.Wrap(err, "Failed to close fileReader")) } + close(ch) }() return ch } -func (p *ParquetReader) GetNumberOfRows() int64 { - return p.parquetReader.GetNumRows() +func (p *ParquetReader) GetNumberOfRowsInMessageFile() int64 { + return p.parquetReaderMessage.GetNumRows() +} + +func (p *ParquetReader) GetNumberOfRowsInOffsetFile() int64 { + return p.parquetReaderOffset.GetNumRows() } -func toKafkaMessage(message ParquetMessage, includePartitionAndOffset bool) (*kafka.Message, error) { +func toKafkaMessage(message KafkaMessage, includePartitionAndOffset bool) (*kafka.Message, error) { timestamp, err := time.Parse(time.RFC3339, message.Timestamp) if err != nil { return nil, errors.Wrapf(err, "Failed to convert string to time.Time: %s", message.Timestamp) @@ -122,3 +176,39 @@ func toKafkaMessage(message ParquetMessage, includePartitionAndOffset bool) (*ka return kafkaMessage, nil } + +func toKafkaConsumerGroupTopicPartitions(offsetMessages []OffsetMessage) ([]kafka.ConsumerGroupTopicPartitions, error) { + res := make([]kafka.ConsumerGroupTopicPartitions, 0) + groupIDToPartitions := make(map[string][]kafka.TopicPartition) + if len(offsetMessages) > 0 { + for _, offsetMessage := range offsetMessages { + var topicPartition kafka.TopicPartition + offset, err := strconv.Atoi(offsetMessage.Offset) + if err != nil { + return res, errors.Wrapf(err, "Failed to convert string to int for message offset: %s", offsetMessage.Offset) + } + topicPartition.Offset = kafka.Offset(offset) + topicPartition.Partition = offsetMessage.Partition + topicPartition.Topic = &offsetMessage.Topic + if val, ok := groupIDToPartitions[offsetMessage.GroupID]; !ok { + topicPartitions := make(kafka.TopicPartitions, 0) + topicPartitions = append(topicPartitions, topicPartition) + groupIDToPartitions[offsetMessage.GroupID] = topicPartitions + } else { + val = append(val, topicPartition) + groupIDToPartitions[offsetMessage.GroupID] = val + } + } + + for k, v := range groupIDToPartitions { + var consumerGroupTopicPartition kafka.ConsumerGroupTopicPartitions + consumerGroupTopicPartition.Group = k + consumerGroupTopicPartition.Partitions = v + res = append(res, consumerGroupTopicPartition) + } + } else { + return res, errors.New("nothing to read!!!") + } + + return res, nil +} diff --git a/impl/parquet_writer.go b/impl/parquet_writer.go index bfe8117..e631745 100644 --- a/impl/parquet_writer.go +++ b/impl/parquet_writer.go @@ -12,22 +12,32 @@ import ( ) type ParquetWriter struct { - parquetWriter *writer.ParquetWriter - fileWriter source.ParquetFile + parquetWriterMessage *writer.ParquetWriter + parquetWriterOffset *writer.ParquetWriter + fileWriterMessage source.ParquetFile + fileWriterOffset source.ParquetFile } -func NewParquetWriter(fileWriter source.ParquetFile) (*ParquetWriter, error) { - parquetWriter, err := writer.NewParquetWriter(fileWriter, new(ParquetMessage), 4) +func NewParquetWriter(fileWriterMessage, fileWriterOffset source.ParquetFile) (*ParquetWriter, error) { + parquetWriterMessage, err := writer.NewParquetWriter(fileWriterMessage, new(KafkaMessage), 9) if err != nil { return nil, errors.Wrap(err, "[NewParquetWriter]") } + + parquetWriterOffset, err := writer.NewParquetWriter(fileWriterOffset, new(OffsetMessage), 4) + if err != nil { + return nil, errors.Wrap(err, "[NewParquetWriter]") + } + return &ParquetWriter{ - fileWriter: fileWriter, - parquetWriter: parquetWriter, + fileWriterMessage: fileWriterMessage, + parquetWriterMessage: parquetWriterMessage, + parquetWriterOffset: parquetWriterOffset, + fileWriterOffset: fileWriterOffset, }, nil } -type ParquetMessage struct { +type KafkaMessage struct { Value string `parquet:"name=value, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` Topic string `parquet:"name=topic, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` Partition int32 `parquet:"name=partition, type=INT32, convertedtype=INT_32"` @@ -39,12 +49,19 @@ type ParquetMessage struct { Metadata *string `parquet:"name=metadata, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` } +type OffsetMessage struct { + GroupID string `parquet:"name=groupid, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` + Topic string `parquet:"name=topic, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` + Partition int32 `parquet:"name=partition, type=INT32, convertedtype=INT_32"` + Offset string `parquet:"name=offset, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` +} + func (f *ParquetWriter) Write(msg kafka.Message) (err error) { headersBytes, err := json.Marshal(msg.Headers) if err != nil { return errors.Wrap(err, "Failed to marshal msg.Headers") } - message := ParquetMessage{ + message := KafkaMessage{ Value: string(msg.Value), Topic: *msg.TopicPartition.Topic, Partition: msg.TopicPartition.Partition, @@ -56,21 +73,47 @@ func (f *ParquetWriter) Write(msg kafka.Message) (err error) { TimestampType: msg.TimestampType.String(), } - err = f.parquetWriter.Write(message) + err = f.parquetWriterMessage.Write(message) if err != nil { return errors.Wrap(err, "[parquetWriter.Write]") } return err } +func (f *ParquetWriter) OffsetWrite(msg kafka.ConsumerGroupTopicPartitions) (err error) { + for _, partition := range msg.Partitions { + message := OffsetMessage{ + GroupID: msg.Group, + Topic: *partition.Topic, + Partition: partition.Partition, + Offset: partition.Offset.String(), + } + + err = f.parquetWriterOffset.Write(message) + if err != nil { + return errors.Wrap(err, "[parquetWriter.Write]") + } + } + + return err +} + func (f *ParquetWriter) Flush() error { - err := f.parquetWriter.WriteStop() + err := f.parquetWriterMessage.WriteStop() + if err != nil { + return errors.Wrap(err, "[parquetWriterMessage.WriteStop()]") + } + err = f.parquetWriterOffset.WriteStop() + if err != nil { + return errors.Wrap(err, "[parquetWriterOffset.WriteStop()]") + } + err = f.fileWriterMessage.Close() if err != nil { - return errors.Wrap(err, "[parquetWriter.WriteStop()]") + return errors.Wrap(err, "[fileWriterMessage.Close()]") } - err = f.fileWriter.Close() + err = f.fileWriterOffset.Close() if err != nil { - return errors.Wrap(err, "[fileWriter.Close()]") + return errors.Wrap(err, "[fileWriterOffset.Close()]") } log.Info("Flushed data to file") return err