Skip to content

Commit

Permalink
refactor: refactor main.go
Browse files Browse the repository at this point in the history
  • Loading branch information
huantt committed Nov 26, 2022
1 parent 2705e4d commit 3a1018e
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 259 deletions.
48 changes: 31 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,25 @@ kafka-dump import \

### Stream messages topic to topic
```shell
Usage:
stream [flags]

Flags:
-h, --help help for stream
--kafka-group-id string Kafka consumer group ID
--kafka-password string Kafka password
--kafka-sasl-mechanism string Kafka password
--kafka-security-protocol string Kafka security protocol
--kafka-servers string Kafka servers string
--kafka-username string Kafka username
--from-kafka-group-id string Kafka consumer group ID
--from-kafka-password string Source Kafka password
--from-kafka-sasl-mechanism string Source Kafka password
--from-kafka-security-protocol string Source Kafka security protocol
--from-kafka-servers string Source Kafka servers string
--from-kafka-username string Source Kafka username
--from-topic string Source topic
-h, --help help for stream
--max-waiting-seconds-for-new-message int Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever. (default 30)
--topic-from string Source topic
--topic-to string Destination topic
--to-kafka-password string Destination Kafka password
--to-kafka-sasl-mechanism string Destination Kafka password
--to-kafka-security-protocol string Destination Kafka security protocol
--to-kafka-servers string Destination Kafka servers string
--to-kafka-username string Destination Kafka username
--to-topic string Destination topic

Global Flags:
--log-level string Log level (default "info")
Expand All @@ -99,14 +107,20 @@ Global Flags:
#### Sample
```shell
kafka-dump stream \
--topic-from=users \
--topic-to=new-users \
--kafka-group-id=stream \
--kafka-servers=localhost:9092 \
--kafka-username=admin \
--kafka-password=admin \
--kafka-security-protocol=SASL_SSL \
--kafka-sasl-mechanism=PLAIN
--from-topic=users \
--from-kafka-group-id=stream \
--from-kafka-servers=localhost:9092 \
--from-kafka-username=admin \
--from-kafka-password=admin \
--from-kafka-security-protocol=SASL_SSL \
--from-kafka-sasl-mechanism=PLAIN \
--to-topic=new-users \
--to-kafka-servers=localhost:9092 \
--to-kafka-username=admin \
--to-kafka-password=admin \
--to-kafka-security-protocol=SASL_SSL \
--to-kafka-sasl-mechanism=PLAIN
--max-waiting-seconds-for-new-message=-1
```

### Count number of rows in parquet file
Expand Down
101 changes: 101 additions & 0 deletions cmd/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package cmd

import (
"fmt"
"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"sync"
"time"
)

func CreateExportCommand() (*cobra.Command, error) {
var filePath string
var kafkaServers string
var kafkaUsername string
var kafkaPassword string
var kafkaSecurityProtocol string
var kafkaSASKMechanism string
var kafkaGroupID string
var topics *[]string
var exportLimitPerFile uint64
var maxWaitingSecondsForNewMessage int
var concurrentConsumers = 1

command := cobra.Command{
Use: "export",
Run: func(cmd *cobra.Command, args []string) {
log.Infof("Limit: %d - Concurrent consumers: %d", exportLimitPerFile, concurrentConsumers)
kafkaConsumerConfig := kafka_utils.Config{
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
GroupId: kafkaGroupID,
}
consumer, err := kafka_utils.NewConsumer(kafkaConsumerConfig)
if err != nil {
panic(errors.Wrap(err, "Unable to init consumer"))
}
maxWaitingTimeForNewMessage := time.Duration(maxWaitingSecondsForNewMessage) * time.Second
options := &impl.Options{
Limit: exportLimitPerFile,
MaxWaitingTimeForNewMessage: &maxWaitingTimeForNewMessage,
}

var wg sync.WaitGroup
wg.Add(concurrentConsumers)
for i := 0; i < concurrentConsumers; i++ {
go func(workerID int) {
defer wg.Done()
for true {
outputFilePath := filePath
if exportLimitPerFile > 0 {
outputFilePath = fmt.Sprintf("%s.%d", filePath, time.Now().UnixMilli())
}
log.Infof("[Worker-%d] Exporting to: %s", workerID, outputFilePath)
parquetWriter, err := impl.NewParquetWriter(outputFilePath)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file writer"))
}
exporter, err := impl.NewExporter(consumer, *topics, parquetWriter, options)
if err != nil {
panic(errors.Wrap(err, "Failed to init exporter"))
}

exportedCount, err := exporter.Run()
if err != nil {
panic(errors.Wrap(err, "Error while running exporter"))
}
log.Infof("[Worker-%d] Exported %d messages", workerID, exportedCount)
if exportLimitPerFile == 0 || exportedCount < exportLimitPerFile {
log.Infof("[Worker-%d] Finished!", workerID)
return
}
}
}(i)
}
wg.Wait()
},
}
command.Flags().StringVarP(&filePath, "file", "f", "", "Output file path (required)")
command.Flags().StringVar(&kafkaServers, "kafka-servers", "", "Kafka servers string")
command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username")
command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password")
command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
command.Flags().StringVar(&kafkaGroupID, "kafka-group-id", "", "Kafka consumer group ID")
command.Flags().Uint64Var(&exportLimitPerFile, "limit", 0, "Supports file splitting. Files are split by the number of messages specified")
command.Flags().IntVar(&maxWaitingSecondsForNewMessage, "max-waiting-seconds-for-new-message", 30, "Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever.")
command.Flags().IntVar(&concurrentConsumers, "concurrent-consumers", 1, "Number of concurrent consumers")
topics = command.Flags().StringArray("kafka-topics", nil, "Kafka topics")
command.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol")
err := command.MarkFlagRequired("file")
if err != nil {
return nil, err
}
return &command, nil
}
75 changes: 75 additions & 0 deletions cmd/importer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package cmd

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
"github.com/spf13/cobra"
)

func CreateImportCmd() (*cobra.Command, error) {
var filePath string
var kafkaServers string
var kafkaUsername string
var kafkaPassword string
var kafkaSecurityProtocol string
var kafkaSASKMechanism string

command := cobra.Command{
Use: "import",
Run: func(cmd *cobra.Command, args []string) {
log.Infof("Input file: %s", filePath)
parquetReader, err := impl.NewParquetReader(filePath)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file reader"))
}
kafkaProducerConfig := kafka_utils.Config{
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
if err != nil {
panic(errors.Wrap(err, "Unable to create producer"))
}
queueBufferingMaxMessages := kafka_utils.DefaultQueueBufferingMaxMessages
if kafkaProducerConfig.QueueBufferingMaxMessages > 0 {
queueBufferingMaxMessages = kafkaProducerConfig.QueueBufferingMaxMessages
}
deliveryChan := make(chan kafka.Event, queueBufferingMaxMessages)
go func() { // Tricky: kafka require specific deliveryChan to use Flush function
for e := range deliveryChan {
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
panic(fmt.Sprintf("Failed to deliver message: %v\n", m.TopicPartition))
} else {
log.Debugf("Successfully produced record to topic %s partition [%d] @ offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
}
}()
importer := impl.NewImporter(producer, deliveryChan, parquetReader)
if err != nil {
panic(errors.Wrap(err, "Unable to init importer"))
}

err = importer.Run()
if err != nil {
panic(errors.Wrap(err, "Error while running importer"))
}
},
}
command.Flags().StringVarP(&filePath, "file", "f", "", "Output file path (required)")
command.Flags().StringVar(&kafkaServers, "kafka-servers", "", "Kafka servers string")
command.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username")
command.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password")
command.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
command.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
command.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol")
return &command, nil
}
29 changes: 29 additions & 0 deletions cmd/parquet_row_counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package cmd

import (
"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
"github.com/spf13/cobra"
)

func CreateCountParquetRowCommand() (*cobra.Command, error) {
var filePath string

command := cobra.Command{
Use: "count-parquet-rows",
Run: func(cmd *cobra.Command, args []string) {
parquetReader, err := impl.NewParquetReader(filePath)
if err != nil {
panic(errors.Wrap(err, "Unable to init parquet file reader"))
}
log.Infof("Number of rows: %d", parquetReader.GetNumberOfRows())
},
}
command.Flags().StringVarP(&filePath, "file", "f", "", "File path (required)")
err := command.MarkFlagRequired("file")
if err != nil {
return nil, err
}
return &command, nil
}
121 changes: 121 additions & 0 deletions cmd/streamer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package cmd

import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/impl"
"github.com/huantt/kafka-dump/pkg/kafka_utils"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/spf13/cobra"
"time"
)

func CreateStreamCmd() (*cobra.Command, error) {
var fromKafkaServers string
var fromKafkaUsername string
var fromKafkaPassword string
var fromKafkaSecurityProtocol string
var fromKafkaSASKMechanism string
var fromKafkaGroupID string

var toKafkaServers string
var toKafkaUsername string
var toKafkaPassword string
var toKafkaSecurityProtocol string
var toKafkaSASKMechanism string

var fromTopic string
var toTopic string

var maxWaitingSecondsForNewMessage int

command := cobra.Command{
Use: "stream",
Run: func(cmd *cobra.Command, args []string) {
kafkaConsumerConfig := kafka_utils.Config{
BootstrapServers: fromKafkaServers,
SecurityProtocol: fromKafkaSecurityProtocol,
SASLMechanism: fromKafkaSASKMechanism,
SASLUsername: fromKafkaUsername,
SASLPassword: fromKafkaPassword,
GroupId: fromKafkaGroupID,
}
consumer, err := kafka_utils.NewConsumer(kafkaConsumerConfig)
if err != nil {
panic(err)
}
kafkaProducerConfig := kafka_utils.Config{
BootstrapServers: fromKafkaServers,
SecurityProtocol: fromKafkaSecurityProtocol,
SASLMechanism: fromKafkaSASKMechanism,
SASLUsername: fromKafkaUsername,
SASLPassword: fromKafkaPassword,
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
if err != nil {
panic(err)
}
queueBufferingMaxMessages := kafka_utils.DefaultQueueBufferingMaxMessages
if kafkaProducerConfig.QueueBufferingMaxMessages > 0 {
queueBufferingMaxMessages = kafkaProducerConfig.QueueBufferingMaxMessages
}
deliveryChan := make(chan kafka.Event, queueBufferingMaxMessages)
go func() { // Tricky: kafka require specific deliveryChan to use Flush function
for e := range deliveryChan {
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
panic(fmt.Sprintf("Failed to deliver message: %v\n", m.TopicPartition))
} else {
log.Debugf("Successfully produced record to topic %s partition [%d] @ offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
}
}()
maxWaitingSecondsForNewMessage := time.Duration(10) * time.Second
streamer := impl.NewStreamer(
consumer,
producer,
fromTopic,
toTopic,
deliveryChan,
impl.StreamerOptions{MaxWaitingTimeForNewMessage: &maxWaitingSecondsForNewMessage},
)
transferredCount, err := streamer.Run()
if err != nil {
panic(err)
}
log.Infof("Transferred %d messages from %s to %s", transferredCount, fromTopic, toTopic)
},
}
command.Flags().StringVar(&fromKafkaServers, "from-kafka-servers", "", "Source Kafka servers string")
command.Flags().StringVar(&fromKafkaUsername, "from-kafka-username", "", "Source Kafka username")
command.Flags().StringVar(&fromKafkaPassword, "from-kafka-password", "", "Source Kafka password")
command.Flags().StringVar(&fromKafkaSASKMechanism, "from-kafka-sasl-mechanism", "", "Source Kafka password")
command.Flags().StringVar(&fromKafkaSecurityProtocol, "from-kafka-security-protocol", "", "Source Kafka security protocol")
command.Flags().StringVar(&fromKafkaGroupID, "from-kafka-group-id", "", "Kafka consumer group ID")
command.Flags().StringVar(&fromTopic, "from-topic", "", "Source topic")

command.Flags().StringVar(&toKafkaServers, "to-kafka-servers", "", "Destination Kafka servers string")
command.Flags().StringVar(&toKafkaUsername, "to-kafka-username", "", "Destination Kafka username")
command.Flags().StringVar(&toKafkaPassword, "to-kafka-password", "", "Destination Kafka password")
command.Flags().StringVar(&toKafkaSASKMechanism, "to-kafka-sasl-mechanism", "", "Destination Kafka password")
command.Flags().StringVar(&toKafkaSecurityProtocol, "to-kafka-security-protocol", "", "Destination Kafka security protocol")
command.Flags().StringVar(&toTopic, "to-topic", "", "Destination topic")

command.Flags().IntVar(&maxWaitingSecondsForNewMessage, "max-waiting-seconds-for-new-message", 30, "Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever.")

command.MarkFlagsRequiredTogether("from-kafka-username", "from-kafka-password", "from-kafka-sasl-mechanism", "from-kafka-security-protocol")
command.MarkFlagsRequiredTogether("from-kafka-username", "from-kafka-password", "from-kafka-sasl-mechanism", "from-kafka-security-protocol")
command.MarkFlagsRequiredTogether("from-topic", "from-kafka-servers", "from-kafka-group-id")
command.MarkFlagsRequiredTogether("to-topic", "to-kafka-servers")

err := command.MarkFlagRequired("from-topic")
if err != nil {
return nil, err
}
err = command.MarkFlagRequired("to-topic")
if err != nil {
return nil, err
}
return &command, nil
}
Loading

0 comments on commit 3a1018e

Please sign in to comment.