From 29fd214f506ae69b6cbfb3f6a81f50ce01da63cd Mon Sep 17 00:00:00 2001 From: Shubham Date: Tue, 12 Sep 2023 20:45:44 +0530 Subject: [PATCH] enhancement: fixing documentation and ability to produce to internal topics --- README.md | 51 ++++++++++++++++++++++++------------- cmd/exporter.go | 10 ++++---- cmd/importer.go | 13 ++++++---- pkg/kafka_utils/config.go | 1 + pkg/kafka_utils/producer.go | 1 + 5 files changed, 49 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 1d9eb14..c5ad592 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ Usage: Flags: --concurrent-consumers int Number of concurrent consumers (default 1) + --enable-auto-offset-store To store offset in kafka broker (default true) --fetch-message-max-bytes int Maximum number of bytes per topic+partition to request when fetching messages from the broker. (default 1048576) -f, --file string Output file path (required) --gcs-bucket string Google Cloud Storage bucket name @@ -37,10 +38,10 @@ Flags: --limit uint Supports file splitting. Files are split by the number of messages specified --max-waiting-seconds-for-new-message int Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever. (default 30) --queued-max-messages-kbytes int Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by fetch.message.max.bytes (default 128000) - --ssl-ca-location string location of client ca cert file in pem - --ssl-certificate-location string client's certificate location - --ssl-key-location string path to ssl private key - --ssl-key-password string password for ssl private key passphrase + --ssl-ca-location string Location of client ca cert file in pem + --ssl-certificate-location string Client certificate location + --ssl-key-location string Path to ssl private key + --ssl-key-password string Password for ssl private key passphrase --storage string Storage type: local file (file) or Google cloud storage (gcs) (default "file") Global Flags: @@ -73,10 +74,10 @@ kafka-dump export \ --kafka-username=admin \ --kafka-password=admin \ --kafka-security-protocol=SSL \ ---kafka-sasl-mechanism=PLAIN ---ssl-ca-location= ---ssl-certificate-location= ---ssl-key-location= +--kafka-sasl-mechanism=PLAIN \ +--ssl-ca-location= \ +--ssl-certificate-location= \ +--ssl-key-location= \ --ssl-key-password= ``` @@ -88,16 +89,16 @@ Usage: Flags: -f, --file string Output file path (required) -h, --help help for import - -i, --include-partition-and-offset to store partition and offset of kafka message in file + -i, --include-partition-and-offset To store partition and offset of kafka message in file --kafka-password string Kafka password --kafka-sasl-mechanism string Kafka password --kafka-security-protocol string Kafka security protocol --kafka-servers string Kafka servers string --kafka-username string Kafka username - --ssl-ca-location string location of client ca cert file in pem - --ssl-certificate-location string client's certificate location - --ssl-key-location string path to ssl private key - --ssl-key-password string password for ssl private key passphrase + --ssl-ca-location string Location of client ca cert file in pem + --ssl-certificate-location string Client certificate location + --ssl-key-location string Path to ssl private key + --ssl-key-password string Password for ssl private key passphrase Global Flags: --log-level string Log level (default "info") @@ -123,10 +124,26 @@ kafka-dump import \ --kafka-username=admin \ --kafka-password=admin \ --kafka-security-protocol=SSL \ ---kafka-sasl-mechanism=PLAIN ---ssl-ca-location= ---ssl-certificate-location= ---ssl-key-location= +--kafka-sasl-mechanism=PLAIN \ +--ssl-ca-location= \ +--ssl-certificate-location= \ +--ssl-key-location= \ +--ssl-key-password= +``` + +- In order to produce data in the internal topics of Kafka cluster (for eg. "__consumer_offsets"), the client id of the producer needs to be configured as `__admin_client`. +```shell +kafka-dump import \ +--file=path/to/input/data.parquet \ +--kafka-servers=localhost:9092 \ +--kafka-username=admin \ +--kafka-password=admin \ +--kafka-security-protocol=SSL \ +--kafka-sasl-mechanism=PLAIN \ +--client-id=__admin_client \ +--ssl-ca-location= \ +--ssl-certificate-location= \ +--ssl-key-location= \ --ssl-key-password= ``` diff --git a/cmd/exporter.go b/cmd/exporter.go index d9663bd..48e2270 100644 --- a/cmd/exporter.go +++ b/cmd/exporter.go @@ -124,11 +124,11 @@ func CreateExportCommand() (*cobra.Command, error) { command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username") command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password") command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password") - command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem") - command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase") - command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location") - command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key") - command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "to store offset in kafka broker") + command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "Location of client ca cert file in pem") + command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "Password for ssl private key passphrase") + command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "Client's certificate location") + command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "Path to ssl private key") + command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "To store offset in kafka broker") command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol") command.Flags().StringVar(&kafkaGroupID, "kafka-group-id", "", "Kafka consumer group ID") command.Flags().Uint64Var(&exportLimitPerFile, "limit", 0, "Supports file splitting. Files are split by the number of messages specified") diff --git a/cmd/importer.go b/cmd/importer.go index 4cf58ec..db06e92 100644 --- a/cmd/importer.go +++ b/cmd/importer.go @@ -23,6 +23,7 @@ func CreateImportCmd() (*cobra.Command, error) { var sslCertLocation string var sslKeyLocation string var includePartitionAndOffset bool + var clientid string command := cobra.Command{ Use: "import", @@ -48,6 +49,7 @@ func CreateImportCmd() (*cobra.Command, error) { SSLCertLocation: sslCertLocation, SSLKeyPassword: sslKeyPassword, EnableAutoOffsetStore: false, + ClientID: clientid, } producer, err := kafka_utils.NewProducer(kafkaProducerConfig) if err != nil { @@ -83,10 +85,11 @@ func CreateImportCmd() (*cobra.Command, error) { command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password") command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol") command.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol") - command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem") - command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase") - command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location") - command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key") - command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "to store partition and offset of kafka message in file") + command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "Location of client ca cert file in pem") + command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "Password for ssl private key passphrase") + command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "Client's certificate location") + command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "Path to ssl private key") + command.Flags().StringVar(&clientid, "client-id", "", "Producer client id") + command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "To store partition and offset of kafka message in file") return &command, nil } diff --git a/pkg/kafka_utils/config.go b/pkg/kafka_utils/config.go index cb07d22..821cf07 100644 --- a/pkg/kafka_utils/config.go +++ b/pkg/kafka_utils/config.go @@ -2,6 +2,7 @@ package kafka_utils type Config struct { BootstrapServers string `json:"bootstrap_servers" mapstructure:"bootstrap_servers"` + ClientID string `json:"client_id" mapstructure:"client_id"` SecurityProtocol string `json:"security_protocol" mapstructure:"security_protocol"` SASLMechanism string `json:"sasl_mechanism" mapstructure:"sasl_mechanism"` SASLUsername string `json:"sasl_username" mapstructure:"sasl_username"` diff --git a/pkg/kafka_utils/producer.go b/pkg/kafka_utils/producer.go index a0185d1..c712a54 100644 --- a/pkg/kafka_utils/producer.go +++ b/pkg/kafka_utils/producer.go @@ -12,6 +12,7 @@ func NewProducer(cfg Config) (*kafka.Producer, error) { config := kafka.ConfigMap{ "bootstrap.servers": cfg.BootstrapServers, "queue.buffering.max.messages": queueBufferingMaxMessages, // librdkafka's default value, + "client.id": cfg.ClientID, } if cfg.SecurityProtocol != "" && cfg.SASLMechanism != "" && cfg.SASLUsername != "" && cfg.SASLPassword != "" { err := config.SetKey("security.protocol", cfg.SecurityProtocol)