Skip to content

Commit

Permalink
fix: set enableAutoOffsetStore for consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
huantt committed Dec 4, 2023
1 parent a60f330 commit 0067df4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
2 changes: 1 addition & 1 deletion cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func CreateExportCommand() (*cobra.Command, error) {
var sslKeyPassword string
var sslCertLocation string
var sslKeyLocation string
var enableAutoOffsetStore = true
var enableAutoOffsetStore bool

command := cobra.Command{
Use: "export",
Expand Down
32 changes: 16 additions & 16 deletions cmd/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,33 @@ func CreateStreamCmd() (*cobra.Command, error) {
Use: "stream",
Run: func(cmd *cobra.Command, args []string) {
kafkaConsumerConfig := kafka_utils.Config{
BootstrapServers: fromKafkaServers,
SecurityProtocol: fromKafkaSecurityProtocol,
SASLMechanism: fromKafkaSASKMechanism,
SASLUsername: fromKafkaUsername,
SASLPassword: fromKafkaPassword,
GroupId: fromKafkaGroupID,
BootstrapServers: fromKafkaServers,
SecurityProtocol: fromKafkaSecurityProtocol,
SASLMechanism: fromKafkaSASKMechanism,
SASLUsername: fromKafkaUsername,
SASLPassword: fromKafkaPassword,
GroupId: fromKafkaGroupID,
EnableAutoOffsetStore: enableAutoOffsetStore,
}
consumer, err := kafka_utils.NewConsumer(kafkaConsumerConfig)
if err != nil {
panic(err)
}
kafkaProducerConfig := kafka_utils.Config{
BootstrapServers: toKafkaServers,
SecurityProtocol: toKafkaSecurityProtocol,
SASLMechanism: toKafkaSASKMechanism,
SASLUsername: toKafkaUsername,
SASLPassword: toKafkaPassword,
EnableAutoOffsetStore: enableAutoOffsetStore,
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
if err != nil {
panic(err)
BootstrapServers: toKafkaServers,
SecurityProtocol: toKafkaSecurityProtocol,
SASLMechanism: toKafkaSASKMechanism,
SASLUsername: toKafkaUsername,
SASLPassword: toKafkaPassword,
}
queueBufferingMaxMessages := kafka_utils.DefaultQueueBufferingMaxMessages
if kafkaProducerConfig.QueueBufferingMaxMessages > 0 {
queueBufferingMaxMessages = kafkaProducerConfig.QueueBufferingMaxMessages
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
if err != nil {
panic(err)
}
deliveryChan := make(chan kafka.Event, queueBufferingMaxMessages)
go func() { // Tricky: kafka require specific deliveryChan to use Flush function
for e := range deliveryChan {
Expand Down

0 comments on commit 0067df4

Please sign in to comment.