Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding configurable ssl configs and change the format of storing kafk… #5

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 54 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,18 @@ Flags:
--limit uint Supports file splitting. Files are split by the number of messages specified
--max-waiting-seconds-for-new-message int Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever. (default 30)
--queued-max-messages-kbytes int Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by fetch.message.max.bytes (default 128000)
--ssl-ca-location string location of client ca cert file in pem
--ssl-certificate-location string client's certificate location
--ssl-key-location string path to ssl private key
--ssl-key-password string password for ssl private key passphrase
--storage string Storage type: local file (file) or Google cloud storage (gcs) (default "file")

Global Flags:
--log-level string Log level (default "info")
```
#### Sample

- Connect to Kafka cluster without the SSL encryption being enabled for exporting the data.
```shell
kafka-dump export \
--storage=file
Expand All @@ -56,24 +62,49 @@ kafka-dump export \
--kafka-sasl-mechanism=PLAIN
```

- Connect to Kafka cluster with the SSL encryption being enabled for exporting the data.
```shell
kafka-dump export \
--storage=file
--file=path/to/output/data.parquet \
--kafka-topics=users-activities \
--kafka-group-id=id=kafka-dump.local \
--kafka-servers=localhost:9092 \
--kafka-username=admin \
--kafka-password=admin \
--kafka-security-protocol=SSL \
--kafka-sasl-mechanism=PLAIN
--ssl-ca-location=<path to ssl cacert>
--ssl-certificate-location=<path to ssl cert>
--ssl-key-location=<path to ssl key>
--ssl-key-password=<ssl key password>
```

### Import Kafka topics from parquet file
```shell
Usage:
import [flags]

Flags:
-f, --file string Output file path (required)
-h, --help help for import
--kafka-password string Kafka password
--kafka-sasl-mechanism string Kafka password
--kafka-security-protocol string Kafka security protocol
--kafka-servers string Kafka servers string
--kafka-username string Kafka username
-f, --file string Output file path (required)
-h, --help help for import
-i, --include-partition-and-offset to store partition and offset of kafka message in file
--kafka-password string Kafka password
--kafka-sasl-mechanism string Kafka password
--kafka-security-protocol string Kafka security protocol
--kafka-servers string Kafka servers string
--kafka-username string Kafka username
--ssl-ca-location string location of client ca cert file in pem
--ssl-certificate-location string client's certificate location
--ssl-key-location string path to ssl private key
--ssl-key-password string password for ssl private key passphrase

Global Flags:
--log-level string Log level (default "info")
```
#### Sample

- Connect to Kafka cluster without the SSL encryption being enabled for importing the data.
```shell
kafka-dump import \
--file=path/to/input/data.parquet \
Expand All @@ -84,6 +115,21 @@ kafka-dump import \
--kafka-sasl-mechanism=PLAIN
```

- Connect to Kafka cluster with the SSL encryption being enabled for importing the data.
```shell
kafka-dump import \
--file=path/to/input/data.parquet \
--kafka-servers=localhost:9092 \
--kafka-username=admin \
--kafka-password=admin \
--kafka-security-protocol=SSL \
--kafka-sasl-mechanism=PLAIN
--ssl-ca-location=<path to ssl cacert>
--ssl-certificate-location=<path to ssl cert>
--ssl-key-location=<path to ssl key>
--ssl-key-password=<ssl key password>
```

### Stream messages topic to topic
```shell
Usage:
Expand Down Expand Up @@ -166,4 +212,4 @@ kafka-dump export \

## TODO
- Import topics from multiple files or directory
- Import topics from Google Cloud Storage files or directory
- Import topics from Google Cloud Storage files or directory
32 changes: 24 additions & 8 deletions cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package cmd
import (
"context"
"fmt"
"sync"
"time"

"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/gcs_utils"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
Expand All @@ -12,8 +15,6 @@ import (
"github.com/xitongsys/parquet-go-source/gcs"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/source"
"sync"
"time"
)

func CreateExportCommand() (*cobra.Command, error) {
Expand All @@ -34,18 +35,28 @@ func CreateExportCommand() (*cobra.Command, error) {
var storageType string
var gcsBucketName string
var gcsProjectID string
var sslCaLocation string
var sslKeyPassword string
var sslCertLocation string
var sslKeyLocation string
var enableAutoOffsetStore bool

command := cobra.Command{
Use: "export",
Run: func(cmd *cobra.Command, args []string) {
log.Infof("Limit: %d - Concurrent consumers: %d", exportLimitPerFile, concurrentConsumers)
kafkaConsumerConfig := kafka_utils.Config{
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
GroupId: kafkaGroupID,
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
GroupId: kafkaGroupID,
SSLCALocation: sslCaLocation,
SSLKeyPassword: sslKeyPassword,
SSLKeyLocation: sslKeyLocation,
SSLCertLocation: sslCertLocation,
EnableAutoOffsetStore: enableAutoOffsetStore,
}
consumer, err := kafka_utils.NewConsumer(kafkaConsumerConfig)
if err != nil {
Expand Down Expand Up @@ -113,6 +124,11 @@ func CreateExportCommand() (*cobra.Command, error) {
command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username")
command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password")
command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key")
command.Flags().BoolVar(&enableAutoOffsetStore, "enable-auto-offset-store", true, "to store offset in kafka broker")
command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
command.Flags().StringVar(&kafkaGroupID, "kafka-group-id", "", "Kafka consumer group ID")
command.Flags().Uint64Var(&exportLimitPerFile, "limit", 0, "Supports file splitting. Files are split by the number of messages specified")
Expand Down
37 changes: 27 additions & 10 deletions cmd/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
Expand All @@ -17,21 +18,36 @@ func CreateImportCmd() (*cobra.Command, error) {
var kafkaPassword string
var kafkaSecurityProtocol string
var kafkaSASKMechanism string
var sslCaLocation string
var sslKeyPassword string
var sslCertLocation string
var sslKeyLocation string
var includePartitionAndOffset bool

command := cobra.Command{
Use: "import",
Run: func(cmd *cobra.Command, args []string) {
log.Infof("Input file: %s", filePath)
parquetReader, err := impl.NewParquetReader(filePath)
parquetReader, err := impl.NewParquetReader(filePath, includePartitionAndOffset)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file reader"))
}
kafkaProducerConfig := kafka_utils.Config{
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
ReadTimeoutSeconds: 0,
GroupId: "",
QueueBufferingMaxMessages: 0,
QueuedMaxMessagesKbytes: 0,
FetchMessageMaxBytes: 0,
SSLCALocation: sslCaLocation,
SSLKeyLocation: sslKeyLocation,
SSLCertLocation: sslCertLocation,
SSLKeyPassword: sslKeyPassword,
EnableAutoOffsetStore: false,
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
if err != nil {
Expand All @@ -54,10 +70,6 @@ func CreateImportCmd() (*cobra.Command, error) {
}
}()
importer := impl.NewImporter(producer, deliveryChan, parquetReader)
if err != nil {
panic(errors.Wrap(err, "Unable to init importer"))
}

err = importer.Run()
if err != nil {
panic(errors.Wrap(err, "Error while running importer"))
Expand All @@ -71,5 +83,10 @@ func CreateImportCmd() (*cobra.Command, error) {
command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
command.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol")
command.Flags().StringVar(&sslCaLocation, "ssl-ca-location", "", "location of client ca cert file in pem")
command.Flags().StringVar(&sslKeyPassword, "ssl-key-password", "", "password for ssl private key passphrase")
command.Flags().StringVar(&sslCertLocation, "ssl-certificate-location", "", "client's certificate location")
command.Flags().StringVar(&sslKeyLocation, "ssl-key-location", "", "path to ssl private key")
command.Flags().BoolVarP(&includePartitionAndOffset, "include-partition-and-offset", "i", false, "to store partition and offset of kafka message in file")
return &command, nil
}
2 changes: 1 addition & 1 deletion cmd/parquet_row_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func CreateCountParquetRowCommand() (*cobra.Command, error) {
command := cobra.Command{
Use: "count-parquet-rows",
Run: func(cmd *cobra.Command, args []string) {
parquetReader, err := impl.NewParquetReader(filePath)
parquetReader, err := impl.NewParquetReader(filePath, false)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file reader"))
}
Expand Down
14 changes: 9 additions & 5 deletions impl/exporter.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package impl

import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
"os"
"os/signal"
"syscall"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
)

type Exporter struct {
Expand Down Expand Up @@ -92,8 +93,11 @@ func (e *Exporter) flushData() error {
}
_, err = e.consumer.Commit()
if err != nil {
err = errors.Wrap(err, "Failed to commit messages")
return err
if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrNoOffset {
log.Warnf("No offset, it can happen when there is no message to read, error is: %v", err)
} else {
return errors.Wrap(err, "Failed to commit messages")
}
}
return nil
}
5 changes: 3 additions & 2 deletions impl/importer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package impl

import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/pkg/errors"
"os"
"os/signal"
"syscall"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/pkg/errors"
)

type Importer struct {
Expand Down
Loading