Skip to content

Commit

Permalink
fix: fix wrong mapping kafkaProducerConfig in streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
huantt committed Dec 4, 2023
1 parent ea2216c commit a60f330
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func CreateExportCommand() (*cobra.Command, error) {
var sslKeyPassword string
var sslCertLocation string
var sslKeyLocation string
var enableAutoOffsetStore bool
var enableAutoOffsetStore = true

command := cobra.Command{
Use: "export",
Expand Down
4 changes: 3 additions & 1 deletion cmd/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func CreateImportCmd() (*cobra.Command, error) {
var sslCertLocation string
var sslKeyLocation string
var includePartitionAndOffset bool
var enableAutoOffsetStore = true

command := cobra.Command{
Use: "import",
Expand All @@ -47,7 +48,7 @@ func CreateImportCmd() (*cobra.Command, error) {
SSLKeyLocation: sslKeyLocation,
SSLCertLocation: sslCertLocation,
SSLKeyPassword: sslKeyPassword,
EnableAutoOffsetStore: false,
EnableAutoOffsetStore: enableAutoOffsetStore,
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
if err != nil {
Expand Down Expand Up @@ -88,5 +89,6 @@ func CreateImportCmd() (*cobra.Command, error) {
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key")
command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "to store partition and offset of kafka message in file")
command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "to store offset in kafka broker")
return &command, nil
}
13 changes: 8 additions & 5 deletions cmd/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func CreateStreamCmd() (*cobra.Command, error) {
var toTopic string

var maxWaitingSecondsForNewMessage int
var enableAutoOffsetStore = true

command := cobra.Command{
Use: "stream",
Expand All @@ -45,11 +46,12 @@ func CreateStreamCmd() (*cobra.Command, error) {
panic(err)
}
kafkaProducerConfig := kafka_utils.Config{
BootstrapServers: fromKafkaServers,
SecurityProtocol: fromKafkaSecurityProtocol,
SASLMechanism: fromKafkaSASKMechanism,
SASLUsername: fromKafkaUsername,
SASLPassword: fromKafkaPassword,
BootstrapServers: toKafkaServers,
SecurityProtocol: toKafkaSecurityProtocol,
SASLMechanism: toKafkaSASKMechanism,
SASLUsername: toKafkaUsername,
SASLPassword: toKafkaPassword,
EnableAutoOffsetStore: enableAutoOffsetStore,
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
if err != nil {
Expand Down Expand Up @@ -105,6 +107,7 @@ func CreateStreamCmd() (*cobra.Command, error) {
command.Flags().StringVar(&toKafkaSASKMechanism, "to-kafka-sasl-mechanism", "", "Destination Kafka password")
command.Flags().StringVar(&toKafkaSecurityProtocol, "to-kafka-security-protocol", "", "Destination Kafka security protocol")
command.Flags().StringVar(&toTopic, "to-topic", "", "Destination topic")
command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "to store offset in kafka broker")

command.Flags().IntVar(&maxWaitingSecondsForNewMessage, "max-waiting-seconds-for-new-message", 30, "Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever.")

Expand Down

0 comments on commit a60f330

Please sign in to comment.