diff --git a/README.md b/README.md index 013bbb2..1d9eb14 100644 --- a/README.md +++ b/README.md @@ -37,12 +37,18 @@ Flags: --limit uint Supports file splitting. Files are split by the number of messages specified --max-waiting-seconds-for-new-message int Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever. (default 30) --queued-max-messages-kbytes int Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by fetch.message.max.bytes (default 128000) + --ssl-ca-location string location of client ca cert file in pem + --ssl-certificate-location string client's certificate location + --ssl-key-location string path to ssl private key + --ssl-key-password string password for ssl private key passphrase --storage string Storage type: local file (file) or Google cloud storage (gcs) (default "file") Global Flags: --log-level string Log level (default "info") ``` #### Sample + +- Connect to Kafka cluster without the SSL encryption being enabled for exporting the data. ```shell kafka-dump export \ --storage=file @@ -56,24 +62,49 @@ kafka-dump export \ --kafka-sasl-mechanism=PLAIN ``` +- Connect to Kafka cluster with the SSL encryption being enabled for exporting the data. +```shell +kafka-dump export \ +--storage=file +--file=path/to/output/data.parquet \ +--kafka-topics=users-activities \ +--kafka-group-id=id=kafka-dump.local \ +--kafka-servers=localhost:9092 \ +--kafka-username=admin \ +--kafka-password=admin \ +--kafka-security-protocol=SSL \ +--kafka-sasl-mechanism=PLAIN +--ssl-ca-location= +--ssl-certificate-location= +--ssl-key-location= +--ssl-key-password= +``` + ### Import Kafka topics from parquet file ```shell Usage: import [flags] Flags: - -f, --file string Output file path (required) - -h, --help help for import - --kafka-password string Kafka password - --kafka-sasl-mechanism string Kafka password - --kafka-security-protocol string Kafka security protocol - --kafka-servers string Kafka servers string - --kafka-username string Kafka username + -f, --file string Output file path (required) + -h, --help help for import + -i, --include-partition-and-offset to store partition and offset of kafka message in file + --kafka-password string Kafka password + --kafka-sasl-mechanism string Kafka password + --kafka-security-protocol string Kafka security protocol + --kafka-servers string Kafka servers string + --kafka-username string Kafka username + --ssl-ca-location string location of client ca cert file in pem + --ssl-certificate-location string client's certificate location + --ssl-key-location string path to ssl private key + --ssl-key-password string password for ssl private key passphrase Global Flags: --log-level string Log level (default "info") ``` #### Sample + +- Connect to Kafka cluster without the SSL encryption being enabled for importing the data. ```shell kafka-dump import \ --file=path/to/input/data.parquet \ @@ -84,6 +115,21 @@ kafka-dump import \ --kafka-sasl-mechanism=PLAIN ``` +- Connect to Kafka cluster with the SSL encryption being enabled for importing the data. +```shell +kafka-dump import \ +--file=path/to/input/data.parquet \ +--kafka-servers=localhost:9092 \ +--kafka-username=admin \ +--kafka-password=admin \ +--kafka-security-protocol=SSL \ +--kafka-sasl-mechanism=PLAIN +--ssl-ca-location= +--ssl-certificate-location= +--ssl-key-location= +--ssl-key-password= +``` + ### Stream messages topic to topic ```shell Usage: @@ -166,4 +212,4 @@ kafka-dump export \ ## TODO - Import topics from multiple files or directory -- Import topics from Google Cloud Storage files or directory \ No newline at end of file +- Import topics from Google Cloud Storage files or directory diff --git a/cmd/exporter.go b/cmd/exporter.go index a074a4d..6579410 100644 --- a/cmd/exporter.go +++ b/cmd/exporter.go @@ -3,6 +3,9 @@ package cmd import ( "context" "fmt" + "sync" + "time" + "github.com/huantt/kafka-dump/impl" "github.com/huantt/kafka-dump/pkg/gcs_utils" "github.com/huantt/kafka-dump/pkg/kafka_utils" @@ -12,8 +15,6 @@ import ( "github.com/xitongsys/parquet-go-source/gcs" "github.com/xitongsys/parquet-go-source/local" "github.com/xitongsys/parquet-go/source" - "sync" - "time" ) func CreateExportCommand() (*cobra.Command, error) { @@ -34,18 +35,28 @@ func CreateExportCommand() (*cobra.Command, error) { var storageType string var gcsBucketName string var gcsProjectID string + var sslCaLocation string + var sslKeyPassword string + var sslCertLocation string + var sslKeyLocation string + var enableAutoOffsetStore bool command := cobra.Command{ Use: "export", Run: func(cmd *cobra.Command, args []string) { log.Infof("Limit: %d - Concurrent consumers: %d", exportLimitPerFile, concurrentConsumers) kafkaConsumerConfig := kafka_utils.Config{ - BootstrapServers: kafkaServers, - SecurityProtocol: kafkaSecurityProtocol, - SASLMechanism: kafkaSASKMechanism, - SASLUsername: kafkaUsername, - SASLPassword: kafkaPassword, - GroupId: kafkaGroupID, + BootstrapServers: kafkaServers, + SecurityProtocol: kafkaSecurityProtocol, + SASLMechanism: kafkaSASKMechanism, + SASLUsername: kafkaUsername, + SASLPassword: kafkaPassword, + GroupId: kafkaGroupID, + SSLCALocation: sslCaLocation, + SSLKeyPassword: sslKeyPassword, + SSLKeyLocation: sslKeyLocation, + SSLCertLocation: sslCertLocation, + EnableAutoOffsetStore: enableAutoOffsetStore, } consumer, err := kafka_utils.NewConsumer(kafkaConsumerConfig) if err != nil { @@ -113,6 +124,11 @@ func CreateExportCommand() (*cobra.Command, error) { command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username") command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password") command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password") + command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem") + command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase") + command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location") + command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key") + command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "to store offset in kafka broker") command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol") command.Flags().StringVar(&kafkaGroupID, "kafka-group-id", "", "Kafka consumer group ID") command.Flags().Uint64Var(&exportLimitPerFile, "limit", 0, "Supports file splitting. Files are split by the number of messages specified") diff --git a/cmd/importer.go b/cmd/importer.go index 553ea21..4cf58ec 100644 --- a/cmd/importer.go +++ b/cmd/importer.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/huantt/kafka-dump/impl" "github.com/huantt/kafka-dump/pkg/kafka_utils" @@ -17,21 +18,36 @@ func CreateImportCmd() (*cobra.Command, error) { var kafkaPassword string var kafkaSecurityProtocol string var kafkaSASKMechanism string + var sslCaLocation string + var sslKeyPassword string + var sslCertLocation string + var sslKeyLocation string + var includePartitionAndOffset bool command := cobra.Command{ Use: "import", Run: func(cmd *cobra.Command, args []string) { log.Infof("Input file: %s", filePath) - parquetReader, err := impl.NewParquetReader(filePath) + parquetReader, err := impl.NewParquetReader(filePath, includePartitionAndOffset) if err != nil { panic(errors.Wrap(err, "Unable to init parquet file reader")) } kafkaProducerConfig := kafka_utils.Config{ - BootstrapServers: kafkaServers, - SecurityProtocol: kafkaSecurityProtocol, - SASLMechanism: kafkaSASKMechanism, - SASLUsername: kafkaUsername, - SASLPassword: kafkaPassword, + BootstrapServers: kafkaServers, + SecurityProtocol: kafkaSecurityProtocol, + SASLMechanism: kafkaSASKMechanism, + SASLUsername: kafkaUsername, + SASLPassword: kafkaPassword, + ReadTimeoutSeconds: 0, + GroupId: "", + QueueBufferingMaxMessages: 0, + QueuedMaxMessagesKbytes: 0, + FetchMessageMaxBytes: 0, + SSLCALocation: sslCaLocation, + SSLKeyLocation: sslKeyLocation, + SSLCertLocation: sslCertLocation, + SSLKeyPassword: sslKeyPassword, + EnableAutoOffsetStore: false, } producer, err := kafka_utils.NewProducer(kafkaProducerConfig) if err != nil { @@ -54,10 +70,6 @@ func CreateImportCmd() (*cobra.Command, error) { } }() importer := impl.NewImporter(producer, deliveryChan, parquetReader) - if err != nil { - panic(errors.Wrap(err, "Unable to init importer")) - } - err = importer.Run() if err != nil { panic(errors.Wrap(err, "Error while running importer")) @@ -71,5 +83,10 @@ func CreateImportCmd() (*cobra.Command, error) { command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password") command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol") command.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol") + command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem") + command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase") + command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location") + command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key") + command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "to store partition and offset of kafka message in file") return &command, nil } diff --git a/cmd/parquet_row_counter.go b/cmd/parquet_row_counter.go index b49d753..8fdde14 100644 --- a/cmd/parquet_row_counter.go +++ b/cmd/parquet_row_counter.go @@ -13,7 +13,7 @@ func CreateCountParquetRowCommand() (*cobra.Command, error) { command := cobra.Command{ Use: "count-parquet-rows", Run: func(cmd *cobra.Command, args []string) { - parquetReader, err := impl.NewParquetReader(filePath) + parquetReader, err := impl.NewParquetReader(filePath, false) if err != nil { panic(errors.Wrap(err, "Unable to init parquet file reader")) } diff --git a/impl/exporter.go b/impl/exporter.go index 47af74d..d9c1df5 100644 --- a/impl/exporter.go +++ b/impl/exporter.go @@ -1,13 +1,14 @@ package impl import ( - "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/huantt/kafka-dump/pkg/log" - "github.com/pkg/errors" "os" "os/signal" "syscall" "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/huantt/kafka-dump/pkg/log" + "github.com/pkg/errors" ) type Exporter struct { @@ -92,8 +93,11 @@ func (e *Exporter) flushData() error { } _, err = e.consumer.Commit() if err != nil { - err = errors.Wrap(err, "Failed to commit messages") - return err + if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrNoOffset { + log.Warnf("No offset, it can happen when there is no message to read, error is: %v", err) + } else { + return errors.Wrap(err, "Failed to commit messages") + } } return nil } diff --git a/impl/importer.go b/impl/importer.go index d8231f0..d63aada 100644 --- a/impl/importer.go +++ b/impl/importer.go @@ -1,11 +1,12 @@ package impl import ( - "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/pkg/errors" "os" "os/signal" "syscall" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/pkg/errors" ) type Importer struct { diff --git a/impl/parquet_reader.go b/impl/parquet_reader.go index f20cced..8f7a88e 100644 --- a/impl/parquet_reader.go +++ b/impl/parquet_reader.go @@ -2,6 +2,9 @@ package impl import ( "encoding/json" + "strconv" + "time" + "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/huantt/kafka-dump/pkg/log" "github.com/pkg/errors" @@ -11,11 +14,12 @@ import ( ) type ParquetReader struct { - parquetReader *reader.ParquetReader - fileReader source.ParquetFile + parquetReader *reader.ParquetReader + fileReader source.ParquetFile + includePartitionAndOffset bool } -func NewParquetReader(filePath string) (*ParquetReader, error) { +func NewParquetReader(filePath string, includePartitionAndOffset bool) (*ParquetReader, error) { fr, err := local.NewLocalFileReader(filePath) if err != nil { return nil, errors.Wrap(err, "Failed to init file reader") @@ -26,8 +30,9 @@ func NewParquetReader(filePath string) (*ParquetReader, error) { return nil, errors.Wrap(err, "Failed to init parquet reader") } return &ParquetReader{ - fileReader: fr, - parquetReader: parquetReader, + fileReader: fr, + parquetReader: parquetReader, + includePartitionAndOffset: includePartitionAndOffset, }, nil } @@ -47,7 +52,7 @@ func (p *ParquetReader) Read() chan kafka.Message { for _, parquetMessage := range parquetMessages { counter++ - message, err := toKafkaMessage(parquetMessage) + message, err := toKafkaMessage(parquetMessage, p.includePartitionAndOffset) if err != nil { err = errors.Wrapf(err, "Failed to parse kafka message from parquet message") panic(err) @@ -69,7 +74,12 @@ func (p *ParquetReader) GetNumberOfRows() int64 { return p.parquetReader.GetNumRows() } -func toKafkaMessage(message ParquetMessage) (*kafka.Message, error) { +func toKafkaMessage(message ParquetMessage, includePartitionAndOffset bool) (*kafka.Message, error) { + timestamp, err := time.Parse(time.RFC3339, message.Timestamp) + if err != nil { + return nil, errors.Wrapf(err, "Failed to convert string to time.Time: %s", message.Timestamp) + } + var headers []kafka.Header if len(message.Headers) > 0 { err := json.Unmarshal([]byte(message.Headers), &headers) @@ -77,12 +87,38 @@ func toKafkaMessage(message ParquetMessage) (*kafka.Message, error) { return nil, errors.Wrapf(err, "Failed to unmarshal kafka headers: %s", message.Headers) } } - return &kafka.Message{ + + var timestampType int + switch message.TimestampType { + case kafka.TimestampCreateTime.String(): + timestampType = int(kafka.TimestampCreateTime) + case kafka.TimestampLogAppendTime.String(): + timestampType = int(kafka.TimestampLogAppendTime) + case kafka.TimestampNotAvailable.String(): + fallthrough + default: + timestampType = int(kafka.TimestampNotAvailable) + } + + kafkaMessage := &kafka.Message{ Value: []byte(message.Value), TopicPartition: kafka.TopicPartition{ Topic: &message.Topic, }, - Key: []byte(message.Key), - Headers: headers, - }, nil + Key: []byte(message.Key), + Headers: headers, + Timestamp: timestamp, + TimestampType: kafka.TimestampType(timestampType), + } + + if includePartitionAndOffset { + offset, err := strconv.Atoi(message.Offset) + if err != nil { + return nil, errors.Wrapf(err, "Failed to convert string to int for message offset: %s", message.Offset) + } + kafkaMessage.TopicPartition.Offset = kafka.Offset(offset) + kafkaMessage.TopicPartition.Partition = message.Partition + } + + return kafkaMessage, nil } diff --git a/impl/parquet_writer.go b/impl/parquet_writer.go index 2198e56..8793409 100644 --- a/impl/parquet_writer.go +++ b/impl/parquet_writer.go @@ -2,6 +2,8 @@ package impl import ( "encoding/json" + "time" + "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/huantt/kafka-dump/pkg/log" "github.com/pkg/errors" @@ -26,10 +28,14 @@ func NewParquetWriter(fileWriter source.ParquetFile) (*ParquetWriter, error) { } type ParquetMessage struct { - Value string `parquet:"name=value, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` - Topic string `parquet:"name=topic, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` - Key string `parquet:"name=key, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` - Headers string `parquet:"name=headers, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` + Value string `parquet:"name=value, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` + Topic string `parquet:"name=topic, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` + Partition int32 `parquet:"name=partition, type=INT32, convertedtype=INT_32"` + Offset string `parquet:"name=offset, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` + Key string `parquet:"name=key, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` + Headers string `parquet:"name=headers, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` + Timestamp string `parquet:"name=timestamp, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` + TimestampType string `parquet:"name=timestamptype, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"` } func (f *ParquetWriter) Write(msg kafka.Message) (err error) { @@ -38,10 +44,14 @@ func (f *ParquetWriter) Write(msg kafka.Message) (err error) { return errors.Wrap(err, "Failed to marshal msg.Headers") } message := ParquetMessage{ - Value: string(msg.Value), - Topic: *msg.TopicPartition.Topic, - Key: string(msg.Key), - Headers: string(headersBytes), + Value: string(msg.Value), + Topic: *msg.TopicPartition.Topic, + Partition: msg.TopicPartition.Partition, + Offset: msg.TopicPartition.Offset.String(), + Key: string(msg.Key), + Headers: string(headersBytes), + Timestamp: msg.Timestamp.Format(time.RFC3339), + TimestampType: msg.TimestampType.String(), } err = f.parquetWriter.Write(message) diff --git a/pkg/kafka_utils/config.go b/pkg/kafka_utils/config.go index 2f0a830..cb07d22 100644 --- a/pkg/kafka_utils/config.go +++ b/pkg/kafka_utils/config.go @@ -11,4 +11,9 @@ type Config struct { QueueBufferingMaxMessages int `json:"queue_buffering_max_messages" mapstructure:"queue_buffering_max_messages"` QueuedMaxMessagesKbytes int64 `json:"queued_max_messages_kbytes" mapstructure:"queued_max_messages_kbytes"` FetchMessageMaxBytes int64 `json:"fetch_message_max_bytes" mapstructure:"fetch_message_max_bytes"` + SSLCALocation string `json:"ssl_ca_location" mapstructure:"ssl_ca_location"` + SSLKeyLocation string `json:"ssl_key_location" mapstructure:"ssl_key_location"` + SSLCertLocation string `json:"ssl_certificate_location" mapstructure:"ssl_certificate_location"` + SSLKeyPassword string `json:"ssl_key_password" mapstructure:"ssl_key_password"` + EnableAutoOffsetStore bool `json:"enable_auto_offset_store" mapstructure:"enable_auto_offset_store"` } diff --git a/pkg/kafka_utils/consumer.go b/pkg/kafka_utils/consumer.go index 9f5c028..2b1ec0b 100644 --- a/pkg/kafka_utils/consumer.go +++ b/pkg/kafka_utils/consumer.go @@ -9,6 +9,7 @@ func NewConsumer(cfg Config) (*kafka.Consumer, error) { "auto.offset.reset": "earliest", "group.id": cfg.GroupId, } + if cfg.SecurityProtocol != "" && cfg.SASLMechanism != "" && cfg.SASLUsername != "" && cfg.SASLPassword != "" { err := config.SetKey("security.protocol", cfg.SecurityProtocol) if err != nil { @@ -27,6 +28,33 @@ func NewConsumer(cfg Config) (*kafka.Consumer, error) { return nil, err } } + + if cfg.SSLCALocation != "" && cfg.SSLKeyLocation != "" && cfg.SSLCertLocation != "" && cfg.SSLKeyPassword != "" { + err := config.SetKey("ssl.ca.location", cfg.SSLCALocation) + if err != nil { + return nil, err + } + err = config.SetKey("ssl.key.location", cfg.SSLKeyLocation) + if err != nil { + return nil, err + } + err = config.SetKey("ssl.certificate.location", cfg.SSLCertLocation) + if err != nil { + return nil, err + } + err = config.SetKey("ssl.key.password", cfg.SSLKeyPassword) + if err != nil { + return nil, err + } + } + + if !cfg.EnableAutoOffsetStore { + err := config.SetKey("enable.auto.offset.store", cfg.EnableAutoOffsetStore) + if err != nil { + return nil, err + } + } + if cfg.QueuedMaxMessagesKbytes > 0 { err := config.SetKey("fetch.message.max.bytes", cfg.FetchMessageMaxBytes) if err != nil { diff --git a/pkg/kafka_utils/producer.go b/pkg/kafka_utils/producer.go index 0bd1ef0..a0185d1 100644 --- a/pkg/kafka_utils/producer.go +++ b/pkg/kafka_utils/producer.go @@ -31,6 +31,26 @@ func NewProducer(cfg Config) (*kafka.Producer, error) { return nil, err } } + + if cfg.SSLCALocation != "" && cfg.SSLKeyLocation != "" && cfg.SSLCertLocation != "" && cfg.SSLKeyPassword != "" { + err := config.SetKey("ssl.ca.location", cfg.SSLCALocation) + if err != nil { + return nil, err + } + err = config.SetKey("ssl.key.location", cfg.SSLKeyLocation) + if err != nil { + return nil, err + } + err = config.SetKey("ssl.certificate.location", cfg.SSLCertLocation) + if err != nil { + return nil, err + } + err = config.SetKey("ssl.key.password", cfg.SSLKeyPassword) + if err != nil { + return nil, err + } + } + kafkaProducer, err := kafka.NewProducer(&config) if err != nil { return nil, err