Skip to content

Commit

Permalink
feat: fixing bug for consumer offset
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamcoc committed Sep 20, 2023
1 parent 85c2ac7 commit 3d844b6
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 18 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ COPY go.mod .
COPY go.sum .
RUN go mod download
COPY . .
RUN GOOS=linux go build -o kafka-dump
RUN GOOS=linux GOARCH=amd64 go build -o kafka-dump

FROM ubuntu:20.04
RUN apt-get update && apt-get install -y ca-certificates
RUN mkdir /app
WORKDIR /app
COPY --from=builder /app/kafka-dump .
CMD ./kafka-dump export
CMD ./kafka-dump export
6 changes: 1 addition & 5 deletions cmd/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func CreateImportCmd() (*cobra.Command, error) {
},
}
command.Flags().StringVarP(&messageFilePath, "file", "f", "", "Output file path for storing message (required)")
command.Flags().StringVarP(&offsetFilePath, "offset-file", "o", "", "Output file path for storing offset (required)")
command.Flags().StringVarP(&offsetFilePath, "offset-file", "o", "", "Output file path for storing offset")
command.Flags().StringVar(&kafkaServers, "kafka-servers", "", "Kafka servers string")
command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username")
command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password")
Expand All @@ -117,9 +117,5 @@ func CreateImportCmd() (*cobra.Command, error) {
if err != nil {
return nil, err
}
err = command.MarkFlagRequired("offset-file")
if err != nil {
return nil, err
}
return &command, nil
}
6 changes: 4 additions & 2 deletions impl/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,18 @@ func (e *Exporter) StoreConsumerGroupOffset() error {
}
log.Infof("group result is: %v", groupRes)

// improve the complexity
// TODO: improve the complexity
for _, groupDescription := range groupRes.ConsumerGroupDescriptions {
log.Infof("group description is: %v", groupDescription)
for _, member := range groupDescription.Members {
log.Infof("member is: %v", member)
for _, groupTopic := range member.Assignment.TopicPartitions {
log.Infof("group topic is: %s", *groupTopic.Topic)
log.Infof("group description groupid: %s", groupDescription.GroupID)
for _, topic := range e.topics {
log.Infof("topic is: %s", topic)
// Matching the topic with the provided topic
// as we don't want to store offset for all the
// consumer group
if *groupTopic.Topic == topic {
consumerGroupList[groupDescription.GroupID] = struct{}{}
}
Expand Down
33 changes: 24 additions & 9 deletions impl/parquet_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type ParquetReader struct {
}

func NewParquetReader(filePathMessage, filePathOffset string, includePartitionAndOffset bool) (*ParquetReader, error) {
var fileReaderOffset source.ParquetFile
var parquetReaderOffset *reader.ParquetReader
fileReaderMessage, err := local.NewLocalFileReader(filePathMessage)
if err != nil {
return nil, errors.Wrap(err, "Failed to init file reader")
Expand All @@ -31,14 +33,16 @@ func NewParquetReader(filePathMessage, filePathOffset string, includePartitionAn
return nil, errors.Wrap(err, "Failed to init parquet reader")
}

fileReaderOffset, err := local.NewLocalFileReader(filePathOffset)
if err != nil {
return nil, errors.Wrap(err, "Failed to init file reader")
}
if filePathOffset != "" {
fileReaderOffset, err = local.NewLocalFileReader(filePathOffset)
if err != nil {
return nil, errors.Wrap(err, "Failed to init file reader")
}

parquetReaderOffset, err := reader.NewParquetReader(fileReaderOffset, new(OffsetMessage), 4)
if err != nil {
return nil, errors.Wrap(err, "Failed to init parquet reader")
parquetReaderOffset, err = reader.NewParquetReader(fileReaderOffset, new(OffsetMessage), 4)
if err != nil {
return nil, errors.Wrap(err, "Failed to init parquet reader")
}
}
return &ParquetReader{
fileReaderMessage: fileReaderMessage,
Expand All @@ -52,8 +56,11 @@ func NewParquetReader(filePathMessage, filePathOffset string, includePartitionAn
const batchSize = 10

func (p *ParquetReader) ReadMessage(restoreBefore, restoreAfter time.Time) chan kafka.Message {
rowNum := int(p.parquetReaderMessage.GetNumRows())
ch := make(chan kafka.Message, batchSize)
if p.parquetReaderMessage == nil {
return ch
}
rowNum := int(p.parquetReaderMessage.GetNumRows())
counter := 0
go func() {
for i := 0; i < rowNum/batchSize+1; i++ {
Expand Down Expand Up @@ -87,9 +94,17 @@ func (p *ParquetReader) ReadMessage(restoreBefore, restoreAfter time.Time) chan
}

func (p *ParquetReader) ReadOffset() chan kafka.ConsumerGroupTopicPartitions {
rowNum := int(p.parquetReaderOffset.GetNumRows())
ch := make(chan kafka.ConsumerGroupTopicPartitions, batchSize)
// When offset file is not given
if p.parquetReaderOffset == nil {
return ch
}
rowNum := int(p.parquetReaderOffset.GetNumRows())
counter := 0
// When offset file is empty
if rowNum == 0 {
return ch
}
go func() {
for i := 0; i < rowNum/batchSize+1; i++ {
offsetMessages := make([]OffsetMessage, batchSize)
Expand Down

0 comments on commit 3d844b6

Please sign in to comment.