From a60f3304785b36790f58c6985af8259ef1f2dd32 Mon Sep 17 00:00:00 2001 From: jack Date: Mon, 4 Dec 2023 17:01:24 +0700 Subject: [PATCH] fix: fix wrong mapping kafkaProducerConfig in streamer --- cmd/exporter.go | 2 +- cmd/importer.go | 4 +++- cmd/streamer.go | 13 ++++++++----- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/cmd/exporter.go b/cmd/exporter.go index d9663bd..3ab0f83 100644 --- a/cmd/exporter.go +++ b/cmd/exporter.go @@ -39,7 +39,7 @@ func CreateExportCommand() (*cobra.Command, error) { var sslKeyPassword string var sslCertLocation string var sslKeyLocation string - var enableAutoOffsetStore bool + var enableAutoOffsetStore = true command := cobra.Command{ Use: "export", diff --git a/cmd/importer.go b/cmd/importer.go index 4cf58ec..18a639b 100644 --- a/cmd/importer.go +++ b/cmd/importer.go @@ -23,6 +23,7 @@ func CreateImportCmd() (*cobra.Command, error) { var sslCertLocation string var sslKeyLocation string var includePartitionAndOffset bool + var enableAutoOffsetStore = true command := cobra.Command{ Use: "import", @@ -47,7 +48,7 @@ func CreateImportCmd() (*cobra.Command, error) { SSLKeyLocation: sslKeyLocation, SSLCertLocation: sslCertLocation, SSLKeyPassword: sslKeyPassword, - EnableAutoOffsetStore: false, + EnableAutoOffsetStore: enableAutoOffsetStore, } producer, err := kafka_utils.NewProducer(kafkaProducerConfig) if err != nil { @@ -88,5 +89,6 @@ func CreateImportCmd() (*cobra.Command, error) { command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location") command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key") command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "to store partition and offset of kafka message in file") + command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "to store offset in kafka broker") return &command, nil } diff --git a/cmd/streamer.go b/cmd/streamer.go index d1056d2..10b55c1 100644 --- a/cmd/streamer.go +++ b/cmd/streamer.go @@ -28,6 +28,7 @@ func CreateStreamCmd() (*cobra.Command, error) { var toTopic string var maxWaitingSecondsForNewMessage int + var enableAutoOffsetStore = true command := cobra.Command{ Use: "stream", @@ -45,11 +46,12 @@ func CreateStreamCmd() (*cobra.Command, error) { panic(err) } kafkaProducerConfig := kafka_utils.Config{ - BootstrapServers: fromKafkaServers, - SecurityProtocol: fromKafkaSecurityProtocol, - SASLMechanism: fromKafkaSASKMechanism, - SASLUsername: fromKafkaUsername, - SASLPassword: fromKafkaPassword, + BootstrapServers: toKafkaServers, + SecurityProtocol: toKafkaSecurityProtocol, + SASLMechanism: toKafkaSASKMechanism, + SASLUsername: toKafkaUsername, + SASLPassword: toKafkaPassword, + EnableAutoOffsetStore: enableAutoOffsetStore, } producer, err := kafka_utils.NewProducer(kafkaProducerConfig) if err != nil { @@ -105,6 +107,7 @@ func CreateStreamCmd() (*cobra.Command, error) { command.Flags().StringVar(&toKafkaSASKMechanism, "to-kafka-sasl-mechanism", "", "Destination Kafka password") command.Flags().StringVar(&toKafkaSecurityProtocol, "to-kafka-security-protocol", "", "Destination Kafka security protocol") command.Flags().StringVar(&toTopic, "to-topic", "", "Destination topic") + command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "to store offset in kafka broker") command.Flags().IntVar(&maxWaitingSecondsForNewMessage, "max-waiting-seconds-for-new-message", 30, "Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever.")