Skip to content

Commit

Permalink
enhancement: fixing documentation and ability to produce to internal …
Browse files Browse the repository at this point in the history
…topics
  • Loading branch information
shubhamcoc committed Sep 12, 2023
1 parent 30efacc commit 29fd214
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 27 deletions.
51 changes: 34 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Usage:

Flags:
--concurrent-consumers int Number of concurrent consumers (default 1)
--enable-auto-offset-store To store offset in kafka broker (default true)
--fetch-message-max-bytes int Maximum number of bytes per topic+partition to request when fetching messages from the broker. (default 1048576)
-f, --file string Output file path (required)
--gcs-bucket string Google Cloud Storage bucket name
Expand All @@ -37,10 +38,10 @@ Flags:
--limit uint Supports file splitting. Files are split by the number of messages specified
--max-waiting-seconds-for-new-message int Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever. (default 30)
--queued-max-messages-kbytes int Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by fetch.message.max.bytes (default 128000)
--ssl-ca-location string location of client ca cert file in pem
--ssl-certificate-location string client's certificate location
--ssl-key-location string path to ssl private key
--ssl-key-password string password for ssl private key passphrase
--ssl-ca-location string Location of client ca cert file in pem
--ssl-certificate-location string Client certificate location
--ssl-key-location string Path to ssl private key
--ssl-key-password string Password for ssl private key passphrase
--storage string Storage type: local file (file) or Google cloud storage (gcs) (default "file")

Global Flags:
Expand Down Expand Up @@ -73,10 +74,10 @@ kafka-dump export \
--kafka-username=admin \
--kafka-password=admin \
--kafka-security-protocol=SSL \
--kafka-sasl-mechanism=PLAIN
--ssl-ca-location=<path to ssl cacert>
--ssl-certificate-location=<path to ssl cert>
--ssl-key-location=<path to ssl key>
--kafka-sasl-mechanism=PLAIN \
--ssl-ca-location=<path to ssl cacert> \
--ssl-certificate-location=<path to ssl cert> \
--ssl-key-location=<path to ssl key> \
--ssl-key-password=<ssl key password>
```

Expand All @@ -88,16 +89,16 @@ Usage:
Flags:
-f, --file string Output file path (required)
-h, --help help for import
-i, --include-partition-and-offset to store partition and offset of kafka message in file
-i, --include-partition-and-offset To store partition and offset of kafka message in file
--kafka-password string Kafka password
--kafka-sasl-mechanism string Kafka password
--kafka-security-protocol string Kafka security protocol
--kafka-servers string Kafka servers string
--kafka-username string Kafka username
--ssl-ca-location string location of client ca cert file in pem
--ssl-certificate-location string client's certificate location
--ssl-key-location string path to ssl private key
--ssl-key-password string password for ssl private key passphrase
--ssl-ca-location string Location of client ca cert file in pem
--ssl-certificate-location string Client certificate location
--ssl-key-location string Path to ssl private key
--ssl-key-password string Password for ssl private key passphrase

Global Flags:
--log-level string Log level (default "info")
Expand All @@ -123,10 +124,26 @@ kafka-dump import \
--kafka-username=admin \
--kafka-password=admin \
--kafka-security-protocol=SSL \
--kafka-sasl-mechanism=PLAIN
--ssl-ca-location=<path to ssl cacert>
--ssl-certificate-location=<path to ssl cert>
--ssl-key-location=<path to ssl key>
--kafka-sasl-mechanism=PLAIN \
--ssl-ca-location=<path to ssl cacert> \
--ssl-certificate-location=<path to ssl cert> \
--ssl-key-location=<path to ssl key> \
--ssl-key-password=<ssl key password>
```

- In order to produce data in the internal topics of Kafka cluster (for eg. "__consumer_offsets"), the client id of the producer needs to be configured as `__admin_client`.
```shell
kafka-dump import \
--file=path/to/input/data.parquet \
--kafka-servers=localhost:9092 \
--kafka-username=admin \
--kafka-password=admin \
--kafka-security-protocol=SSL \
--kafka-sasl-mechanism=PLAIN \
--client-id=__admin_client \
--ssl-ca-location=<path to ssl cacert> \
--ssl-certificate-location=<path to ssl cert> \
--ssl-key-location=<path to ssl key> \
--ssl-key-password=<ssl key password>
```

Expand Down
10 changes: 5 additions & 5 deletions cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ func CreateExportCommand() (*cobra.Command, error) {
command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username")
command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password")
command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key")
command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "to store offset in kafka broker")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "Location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "Password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "Client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "Path to ssl private key")
command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "To store offset in kafka broker")
command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
command.Flags().StringVar(&kafkaGroupID, "kafka-group-id", "", "Kafka consumer group ID")
command.Flags().Uint64Var(&exportLimitPerFile, "limit", 0, "Supports file splitting. Files are split by the number of messages specified")
Expand Down
13 changes: 8 additions & 5 deletions cmd/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func CreateImportCmd() (*cobra.Command, error) {
var sslCertLocation string
var sslKeyLocation string
var includePartitionAndOffset bool
var clientid string

command := cobra.Command{
Use: "import",
Expand All @@ -48,6 +49,7 @@ func CreateImportCmd() (*cobra.Command, error) {
SSLCertLocation: sslCertLocation,
SSLKeyPassword: sslKeyPassword,
EnableAutoOffsetStore: false,
ClientID: clientid,
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
if err != nil {
Expand Down Expand Up @@ -83,10 +85,11 @@ func CreateImportCmd() (*cobra.Command, error) {
command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
command.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key")
command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "to store partition and offset of kafka message in file")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "Location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "Password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "Client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "Path to ssl private key")
command.Flags().StringVar(&clientid, "client-id", "", "Producer client id")
command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "To store partition and offset of kafka message in file")
return &command, nil
}
1 change: 1 addition & 0 deletions pkg/kafka_utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka_utils

type Config struct {
BootstrapServers string `json:"bootstrap_servers" mapstructure:"bootstrap_servers"`
ClientID string `json:"client_id" mapstructure:"client_id"`
SecurityProtocol string `json:"security_protocol" mapstructure:"security_protocol"`
SASLMechanism string `json:"sasl_mechanism" mapstructure:"sasl_mechanism"`
SASLUsername string `json:"sasl_username" mapstructure:"sasl_username"`
Expand Down
1 change: 1 addition & 0 deletions pkg/kafka_utils/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func NewProducer(cfg Config) (*kafka.Producer, error) {
config := kafka.ConfigMap{
"bootstrap.servers": cfg.BootstrapServers,
"queue.buffering.max.messages": queueBufferingMaxMessages, // librdkafka's default value,
"client.id": cfg.ClientID,
}
if cfg.SecurityProtocol != "" && cfg.SASLMechanism != "" && cfg.SASLUsername != "" && cfg.SASLPassword != "" {
err := config.SetKey("security.protocol", cfg.SecurityProtocol)
Expand Down

0 comments on commit 29fd214

Please sign in to comment.