Skip to content

Commit

Permalink
adding configurable ssl configs and change the format of storing kafk…
Browse files Browse the repository at this point in the history
…a messages
  • Loading branch information
shubhamcoc committed Sep 12, 2023
1 parent afd1550 commit 50dbe2c
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 45 deletions.
32 changes: 24 additions & 8 deletions cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package cmd
import (
"context"
"fmt"
"sync"
"time"

"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/gcs_utils"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
Expand All @@ -12,8 +15,6 @@ import (
"github.com/xitongsys/parquet-go-source/gcs"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/source"
"sync"
"time"
)

func CreateExportCommand() (*cobra.Command, error) {
Expand All @@ -34,18 +35,28 @@ func CreateExportCommand() (*cobra.Command, error) {
var storageType string
var gcsBucketName string
var gcsProjectID string
var sslCaLocation string
var sslKeyPassword string
var sslCertLocation string
var sslKeyLocation string
var enableAutoOffsetStore bool

command := cobra.Command{
Use: "export",
Run: func(cmd *cobra.Command, args []string) {
log.Infof("Limit: %d - Concurrent consumers: %d", exportLimitPerFile, concurrentConsumers)
kafkaConsumerConfig := kafka_utils.Config{
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
GroupId: kafkaGroupID,
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
GroupId: kafkaGroupID,
SSLCALocation: sslCaLocation,
SSLKeyPassword: sslKeyPassword,
SSLKeyLocation: sslKeyLocation,
SSLCertLocation: sslCertLocation,
EnableAutoOffsetStore: enableAutoOffsetStore,
}
consumer, err := kafka_utils.NewConsumer(kafkaConsumerConfig)
if err != nil {
Expand Down Expand Up @@ -113,6 +124,11 @@ func CreateExportCommand() (*cobra.Command, error) {
command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username")
command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password")
command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key")
command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "to store offset in kafka broker")
command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
command.Flags().StringVar(&kafkaGroupID, "kafka-group-id", "", "Kafka consumer group ID")
command.Flags().Uint64Var(&exportLimitPerFile, "limit", 0, "Supports file splitting. Files are split by the number of messages specified")
Expand Down
37 changes: 27 additions & 10 deletions cmd/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
Expand All @@ -17,21 +18,36 @@ func CreateImportCmd() (*cobra.Command, error) {
var kafkaPassword string
var kafkaSecurityProtocol string
var kafkaSASKMechanism string
var sslCaLocation string
var sslKeyPassword string
var sslCertLocation string
var sslKeyLocation string
var includePartitionAndOffset bool

command := cobra.Command{
Use: "import",
Run: func(cmd *cobra.Command, args []string) {
log.Infof("Input file: %s", filePath)
parquetReader, err := impl.NewParquetReader(filePath)
parquetReader, err := impl.NewParquetReader(filePath, includePartitionAndOffset)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file reader"))
}
kafkaProducerConfig := kafka_utils.Config{
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
ReadTimeoutSeconds: 0,
GroupId: "",
QueueBufferingMaxMessages: 0,
QueuedMaxMessagesKbytes: 0,
FetchMessageMaxBytes: 0,
SSLCALocation: sslCaLocation,
SSLKeyLocation: sslKeyLocation,
SSLCertLocation: sslCertLocation,
SSLKeyPassword: sslKeyPassword,
EnableAutoOffsetStore: false,
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
if err != nil {
Expand All @@ -54,10 +70,6 @@ func CreateImportCmd() (*cobra.Command, error) {
}
}()
importer := impl.NewImporter(producer, deliveryChan, parquetReader)
if err != nil {
panic(errors.Wrap(err, "Unable to init importer"))
}

err = importer.Run()
if err != nil {
panic(errors.Wrap(err, "Error while running importer"))
Expand All @@ -71,5 +83,10 @@ func CreateImportCmd() (*cobra.Command, error) {
command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
command.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key")
command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "to store partition and offset of kafka message in file")
return &command, nil
}
2 changes: 1 addition & 1 deletion cmd/parquet_row_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func CreateCountParquetRowCommand() (*cobra.Command, error) {
command := cobra.Command{
Use: "count-parquet-rows",
Run: func(cmd *cobra.Command, args []string) {
parquetReader, err := impl.NewParquetReader(filePath)
parquetReader, err := impl.NewParquetReader(filePath, false)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file reader"))
}
Expand Down
14 changes: 9 additions & 5 deletions impl/exporter.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package impl

import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
"os"
"os/signal"
"syscall"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
)

type Exporter struct {
Expand Down Expand Up @@ -92,8 +93,11 @@ func (e *Exporter) flushData() error {
}
_, err = e.consumer.Commit()
if err != nil {
err = errors.Wrap(err, "Failed to commit messages")
return err
if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrNoOffset {
log.Warnf("No offset, it can happen when there is no message to read, error is: %v", err)
} else {
return errors.Wrap(err, "Failed to commit messages")
}
}
return nil
}
5 changes: 3 additions & 2 deletions impl/importer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package impl

import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/pkg/errors"
"os"
"os/signal"
"syscall"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/pkg/errors"
)

type Importer struct {
Expand Down
58 changes: 47 additions & 11 deletions impl/parquet_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package impl

import (
"encoding/json"
"strconv"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
Expand All @@ -11,11 +14,12 @@ import (
)

type ParquetReader struct {
parquetReader *reader.ParquetReader
fileReader source.ParquetFile
parquetReader *reader.ParquetReader
fileReader source.ParquetFile
includePartitionAndOffset bool
}

func NewParquetReader(filePath string) (*ParquetReader, error) {
func NewParquetReader(filePath string, includePartitionAndOffset bool) (*ParquetReader, error) {
fr, err := local.NewLocalFileReader(filePath)
if err != nil {
return nil, errors.Wrap(err, "Failed to init file reader")
Expand All @@ -26,8 +30,9 @@ func NewParquetReader(filePath string) (*ParquetReader, error) {
return nil, errors.Wrap(err, "Failed to init parquet reader")
}
return &ParquetReader{
fileReader: fr,
parquetReader: parquetReader,
fileReader: fr,
parquetReader: parquetReader,
includePartitionAndOffset: includePartitionAndOffset,
}, nil
}

Expand All @@ -47,7 +52,7 @@ func (p *ParquetReader) Read() chan kafka.Message {

for _, parquetMessage := range parquetMessages {
counter++
message, err := toKafkaMessage(parquetMessage)
message, err := toKafkaMessage(parquetMessage, p.includePartitionAndOffset)
if err != nil {
err = errors.Wrapf(err, "Failed to parse kafka message from parquet message")
panic(err)
Expand All @@ -69,20 +74,51 @@ func (p *ParquetReader) GetNumberOfRows() int64 {
return p.parquetReader.GetNumRows()
}

func toKafkaMessage(message ParquetMessage) (*kafka.Message, error) {
func toKafkaMessage(message ParquetMessage, includePartitionAndOffset bool) (*kafka.Message, error) {
timestamp, err := time.Parse(time.RFC3339, message.Timestamp)
if err != nil {
return nil, errors.Wrapf(err, "Failed to convert string to time.Time: %s", message.Timestamp)
}

var headers []kafka.Header
if len(message.Headers) > 0 {
err := json.Unmarshal([]byte(message.Headers), &headers)
if err != nil {
return nil, errors.Wrapf(err, "Failed to unmarshal kafka headers: %s", message.Headers)
}
}
return &kafka.Message{

var timestampType int
switch message.TimestampType {
case kafka.TimestampCreateTime.String():
timestampType = int(kafka.TimestampCreateTime)
case kafka.TimestampLogAppendTime.String():
timestampType = int(kafka.TimestampLogAppendTime)
case kafka.TimestampNotAvailable.String():
fallthrough
default:
timestampType = int(kafka.TimestampNotAvailable)
}

kafkaMessage := &kafka.Message{
Value: []byte(message.Value),
TopicPartition: kafka.TopicPartition{
Topic: &message.Topic,
},
Key: []byte(message.Key),
Headers: headers,
}, nil
Key: []byte(message.Key),
Headers: headers,
Timestamp: timestamp,
TimestampType: kafka.TimestampType(timestampType),
}

if includePartitionAndOffset {
offset, err := strconv.Atoi(message.Offset)
if err != nil {
return nil, errors.Wrapf(err, "Failed to convert string to int for message offset: %s", message.Offset)
}
kafkaMessage.TopicPartition.Offset = kafka.Offset(offset)
kafkaMessage.TopicPartition.Partition = message.Partition
}

return kafkaMessage, nil
}
26 changes: 18 additions & 8 deletions impl/parquet_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package impl

import (
"encoding/json"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
Expand All @@ -26,10 +28,14 @@ func NewParquetWriter(fileWriter source.ParquetFile) (*ParquetWriter, error) {
}

type ParquetMessage struct {
Value string `parquet:"name=value, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Topic string `parquet:"name=topic, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Key string `parquet:"name=key, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Headers string `parquet:"name=headers, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Value string `parquet:"name=value, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Topic string `parquet:"name=topic, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Partition int32 `parquet:"name=partition, type=INT32, convertedtype=INT_32"`
Offset string `parquet:"name=offset, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Key string `parquet:"name=key, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Headers string `parquet:"name=headers, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
Timestamp string `parquet:"name=timestamp, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
TimestampType string `parquet:"name=timestamptype, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN"`
}

func (f *ParquetWriter) Write(msg kafka.Message) (err error) {
Expand All @@ -38,10 +44,14 @@ func (f *ParquetWriter) Write(msg kafka.Message) (err error) {
return errors.Wrap(err, "Failed to marshal msg.Headers")
}
message := ParquetMessage{
Value: string(msg.Value),
Topic: *msg.TopicPartition.Topic,
Key: string(msg.Key),
Headers: string(headersBytes),
Value: string(msg.Value),
Topic: *msg.TopicPartition.Topic,
Partition: msg.TopicPartition.Partition,
Offset: msg.TopicPartition.Offset.String(),
Key: string(msg.Key),
Headers: string(headersBytes),
Timestamp: msg.Timestamp.Format(time.RFC3339),
TimestampType: msg.TimestampType.String(),
}

err = f.parquetWriter.Write(message)
Expand Down
5 changes: 5 additions & 0 deletions pkg/kafka_utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,9 @@ type Config struct {
QueueBufferingMaxMessages int `json:"queue_buffering_max_messages" mapstructure:"queue_buffering_max_messages"`
QueuedMaxMessagesKbytes int64 `json:"queued_max_messages_kbytes" mapstructure:"queued_max_messages_kbytes"`
FetchMessageMaxBytes int64 `json:"fetch_message_max_bytes" mapstructure:"fetch_message_max_bytes"`
SSLCALocation string `json:"ssl_ca_location" mapstructure:"ssl_ca_location"`
SSLKeyLocation string `json:"ssl_key_location" mapstructure:"ssl_key_location"`
SSLCertLocation string `json:"ssl_certificate_location" mapstructure:"ssl_certificate_location"`
SSLKeyPassword string `json:"ssl_key_password" mapstructure:"ssl_key_password"`
EnableAutoOffsetStore bool `json:"enable_auto_offset_store" mapstructure:"enable_auto_offset_store"`
}
28 changes: 28 additions & 0 deletions pkg/kafka_utils/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ func NewConsumer(cfg Config) (*kafka.Consumer, error) {
"auto.offset.reset": "earliest",
"group.id": cfg.GroupId,
}

if cfg.SecurityProtocol != "" && cfg.SASLMechanism != "" && cfg.SASLUsername != "" && cfg.SASLPassword != "" {
err := config.SetKey("security.protocol", cfg.SecurityProtocol)
if err != nil {
Expand All @@ -27,6 +28,33 @@ func NewConsumer(cfg Config) (*kafka.Consumer, error) {
return nil, err
}
}

if cfg.SSLCALocation != "" && cfg.SSLKeyLocation != "" && cfg.SSLCertLocation != "" && cfg.SSLKeyPassword != "" {
err := config.SetKey("ssl.ca.location", cfg.SSLCALocation)
if err != nil {
return nil, err
}
err = config.SetKey("ssl.key.location", cfg.SSLKeyLocation)
if err != nil {
return nil, err
}
err = config.SetKey("ssl.certificate.location", cfg.SSLCertLocation)
if err != nil {
return nil, err
}
err = config.SetKey("ssl.key.password", cfg.SSLKeyPassword)
if err != nil {
return nil, err
}
}

if !cfg.EnableAutoOffsetStore {
err := config.SetKey("enable.auto.offset.store", cfg.EnableAutoOffsetStore)
if err != nil {
return nil, err
}
}

if cfg.QueuedMaxMessagesKbytes > 0 {
err := config.SetKey("fetch.message.max.bytes", cfg.FetchMessageMaxBytes)
if err != nil {
Expand Down
Loading

0 comments on commit 50dbe2c

Please sign in to comment.