From 3d844b6211e6418e3bc1cb64d71f53f66fc24f9a Mon Sep 17 00:00:00 2001 From: Shubham Date: Wed, 20 Sep 2023 17:19:13 +0530 Subject: [PATCH] feat: fixing bug for consumer offset --- Dockerfile | 4 ++-- cmd/importer.go | 6 +----- impl/exporter.go | 6 ++++-- impl/parquet_reader.go | 33 ++++++++++++++++++++++++--------- 4 files changed, 31 insertions(+), 18 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6211aaa..0017052 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,11 +5,11 @@ COPY go.mod . COPY go.sum . RUN go mod download COPY . . -RUN GOOS=linux go build -o kafka-dump +RUN GOOS=linux GOARCH=amd64 go build -o kafka-dump FROM ubuntu:20.04 RUN apt-get update && apt-get install -y ca-certificates RUN mkdir /app WORKDIR /app COPY --from=builder /app/kafka-dump . -CMD ./kafka-dump export \ No newline at end of file +CMD ./kafka-dump export diff --git a/cmd/importer.go b/cmd/importer.go index fd6ff51..3a3baa6 100644 --- a/cmd/importer.go +++ b/cmd/importer.go @@ -98,7 +98,7 @@ func CreateImportCmd() (*cobra.Command, error) { }, } command.Flags().StringVarP(&messageFilePath, "file", "f", "", "Output file path for storing message (required)") - command.Flags().StringVarP(&offsetFilePath, "offset-file", "o", "", "Output file path for storing offset (required)") + command.Flags().StringVarP(&offsetFilePath, "offset-file", "o", "", "Output file path for storing offset") command.Flags().StringVar(&kafkaServers, "kafka-servers", "", "Kafka servers string") command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username") command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password") @@ -117,9 +117,5 @@ func CreateImportCmd() (*cobra.Command, error) { if err != nil { return nil, err } - err = command.MarkFlagRequired("offset-file") - if err != nil { - return nil, err - } return &command, nil } diff --git a/impl/exporter.go b/impl/exporter.go index eff0e72..2f94d11 100644 --- a/impl/exporter.go +++ b/impl/exporter.go @@ -142,16 +142,18 @@ func (e *Exporter) StoreConsumerGroupOffset() error { } log.Infof("group result is: %v", groupRes) - // improve the complexity + // TODO: improve the complexity for _, groupDescription := range groupRes.ConsumerGroupDescriptions { log.Infof("group description is: %v", groupDescription) for _, member := range groupDescription.Members { log.Infof("member is: %v", member) for _, groupTopic := range member.Assignment.TopicPartitions { log.Infof("group topic is: %s", *groupTopic.Topic) - log.Infof("group description groupid: %s", groupDescription.GroupID) for _, topic := range e.topics { log.Infof("topic is: %s", topic) + // Matching the topic with the provided topic + // as we don't want to store offset for all the + // consumer group if *groupTopic.Topic == topic { consumerGroupList[groupDescription.GroupID] = struct{}{} } diff --git a/impl/parquet_reader.go b/impl/parquet_reader.go index 4676313..8c2e5f1 100644 --- a/impl/parquet_reader.go +++ b/impl/parquet_reader.go @@ -21,6 +21,8 @@ type ParquetReader struct { } func NewParquetReader(filePathMessage, filePathOffset string, includePartitionAndOffset bool) (*ParquetReader, error) { + var fileReaderOffset source.ParquetFile + var parquetReaderOffset *reader.ParquetReader fileReaderMessage, err := local.NewLocalFileReader(filePathMessage) if err != nil { return nil, errors.Wrap(err, "Failed to init file reader") @@ -31,14 +33,16 @@ func NewParquetReader(filePathMessage, filePathOffset string, includePartitionAn return nil, errors.Wrap(err, "Failed to init parquet reader") } - fileReaderOffset, err := local.NewLocalFileReader(filePathOffset) - if err != nil { - return nil, errors.Wrap(err, "Failed to init file reader") - } + if filePathOffset != "" { + fileReaderOffset, err = local.NewLocalFileReader(filePathOffset) + if err != nil { + return nil, errors.Wrap(err, "Failed to init file reader") + } - parquetReaderOffset, err := reader.NewParquetReader(fileReaderOffset, new(OffsetMessage), 4) - if err != nil { - return nil, errors.Wrap(err, "Failed to init parquet reader") + parquetReaderOffset, err = reader.NewParquetReader(fileReaderOffset, new(OffsetMessage), 4) + if err != nil { + return nil, errors.Wrap(err, "Failed to init parquet reader") + } } return &ParquetReader{ fileReaderMessage: fileReaderMessage, @@ -52,8 +56,11 @@ func NewParquetReader(filePathMessage, filePathOffset string, includePartitionAn const batchSize = 10 func (p *ParquetReader) ReadMessage(restoreBefore, restoreAfter time.Time) chan kafka.Message { - rowNum := int(p.parquetReaderMessage.GetNumRows()) ch := make(chan kafka.Message, batchSize) + if p.parquetReaderMessage == nil { + return ch + } + rowNum := int(p.parquetReaderMessage.GetNumRows()) counter := 0 go func() { for i := 0; i < rowNum/batchSize+1; i++ { @@ -87,9 +94,17 @@ func (p *ParquetReader) ReadMessage(restoreBefore, restoreAfter time.Time) chan } func (p *ParquetReader) ReadOffset() chan kafka.ConsumerGroupTopicPartitions { - rowNum := int(p.parquetReaderOffset.GetNumRows()) ch := make(chan kafka.ConsumerGroupTopicPartitions, batchSize) + // When offset file is not given + if p.parquetReaderOffset == nil { + return ch + } + rowNum := int(p.parquetReaderOffset.GetNumRows()) counter := 0 + // When offset file is empty + if rowNum == 0 { + return ch + } go func() { for i := 0; i < rowNum/batchSize+1; i++ { offsetMessages := make([]OffsetMessage, batchSize)