Skip to content

Commit

Permalink
feat: consumer_offset restore
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamcoc committed Sep 15, 2023
1 parent 29fd214 commit 6d68949
Show file tree
Hide file tree
Showing 13 changed files with 2,172 additions and 71 deletions.
7 changes: 6 additions & 1 deletion cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func CreateExportCommand() (*cobra.Command, error) {
SSLCertLocation: sslCertLocation,
EnableAutoOffsetStore: enableAutoOffsetStore,
}
adminClient, err := kafka_utils.NewAdminClient(kafkaConsumerConfig)
if err != nil {
panic(errors.Wrap(err, "Unable to init admin client"))
}

consumer, err := kafka_utils.NewConsumer(kafkaConsumerConfig)
if err != nil {
panic(errors.Wrap(err, "Unable to init consumer"))
Expand Down Expand Up @@ -95,7 +100,7 @@ func CreateExportCommand() (*cobra.Command, error) {
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file writer"))
}
exporter, err := impl.NewExporter(consumer, *topics, parquetWriter, options)
exporter, err := impl.NewExporter(adminClient, consumer, *topics, parquetWriter, options)
if err != nil {
panic(errors.Wrap(err, "Failed to init exporter"))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cmd
import (
"fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
"github.com/huantt/kafka-dump/pkg/log"
Expand Down
5 changes: 3 additions & 2 deletions cmd/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package cmd

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/spf13/cobra"
"time"
)

func CreateStreamCmd() (*cobra.Command, error) {
Expand Down
48 changes: 25 additions & 23 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,42 @@ module github.com/huantt/kafka-dump
go 1.18

require (
cloud.google.com/go/storage v1.14.0
github.com/confluentinc/confluent-kafka-go v1.9.2
cloud.google.com/go/storage v1.29.0
github.com/confluentinc/confluent-kafka-go/v2 v2.2.0
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.6.1
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0
google.golang.org/api v0.114.0
)

require (
cloud.google.com/go v0.75.0 // indirect
cloud.google.com/go v0.110.0 // indirect
cloud.google.com/go/compute v1.19.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v0.13.0 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/thrift v0.14.2 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/klauspost/compress v1.13.1 // indirect
github.com/pierrec/lz4/v4 v4.1.8 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opencensus.io v0.22.5 // indirect
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.4.1 // indirect
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/text v0.3.5 // indirect
golang.org/x/tools v0.1.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/api v0.40.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29 // indirect
google.golang.org/grpc v1.46.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect
google.golang.org/grpc v1.54.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)
Loading

0 comments on commit 6d68949

Please sign in to comment.