Skip to content

Commit

Permalink
feat: implement topic-to-topic streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
huantt committed Nov 26, 2022
1 parent c24a480 commit 2705e4d
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 7 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,38 @@ kafka-dump import \
--kafka-sasl-mechanism=PLAIN
```

### Stream messages topic to topic
```shell
Flags:
-h, --help help for stream
--kafka-group-id string Kafka consumer group ID
--kafka-password string Kafka password
--kafka-sasl-mechanism string Kafka password
--kafka-security-protocol string Kafka security protocol
--kafka-servers string Kafka servers string
--kafka-username string Kafka username
--max-waiting-seconds-for-new-message int Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever. (default 30)
--topic-from string Source topic
--topic-to string Destination topic

Global Flags:
--log-level string Log level (default "info")

```

#### Sample
```shell
kafka-dump stream \
--topic-from=users \
--topic-to=new-users \
--kafka-group-id=stream \
--kafka-servers=localhost:9092 \
--kafka-username=admin \
--kafka-password=admin \
--kafka-security-protocol=SASL_SSL \
--kafka-sasl-mechanism=PLAIN
```

### Count number of rows in parquet file
```shell
Usage:
Expand Down
92 changes: 92 additions & 0 deletions impl/streamer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package impl

import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/huantt/kafka-dump/pkg/log"
"github.com/pkg/errors"
"os"
"os/signal"
"syscall"
"time"
)

type Streamer struct {
consumer *kafka.Consumer
producer *kafka.Producer
topicFrom string
topicTo string
deliveryChan chan kafka.Event
options StreamerOptions
}

func NewStreamer(consumer *kafka.Consumer, producer *kafka.Producer, topicFrom string, topicTo string, deliveryChan chan kafka.Event, options StreamerOptions) *Streamer {
return &Streamer{
consumer: consumer,
producer: producer,
topicFrom: topicFrom,
topicTo: topicTo,
deliveryChan: deliveryChan,
options: options,
}
}

type StreamerOptions struct {
MaxWaitingTimeForNewMessage *time.Duration
}

func (s *Streamer) Run() (transferredCount int64, err error) {
err = s.consumer.Subscribe(s.topicFrom, nil)
if err != nil {
return transferredCount, errors.Wrapf(err, "Failed to subscribe to topic %s", s.topicFrom)
}
log.Infof("Subscribed topics: %s", s.topicFrom)
{
cx := make(chan os.Signal, 1)
signal.Notify(cx, os.Interrupt, syscall.SIGTERM)
go func() {
<-cx
err = s.onShutdown()
if err != nil {
panic(err)
}
os.Exit(1)
}()
defer func() {
err = s.onShutdown()
if err != nil {
panic(err)
}
}()
}

maxWaitingTimeForNewMessage := defaultMaxWaitingTimeForNewMessage
if s.options.MaxWaitingTimeForNewMessage != nil {
maxWaitingTimeForNewMessage = *s.options.MaxWaitingTimeForNewMessage
}
for {
msg, err := s.consumer.ReadMessage(maxWaitingTimeForNewMessage)
if err != nil {
if kafkaErr, ok := err.(kafka.Error); ok && kafkaErr.Code() == kafka.ErrTimedOut {
log.Infof("Waited for %s but no messages any more! Finish!", maxWaitingTimeForNewMessage)
}
return transferredCount, err
}

msg.TopicPartition = kafka.TopicPartition{
Topic: &s.topicTo,
Partition: kafka.PartitionAny,
}
err = s.producer.Produce(msg, s.deliveryChan)
if err != nil {
return transferredCount, errors.Wrapf(err, "Failed to produce message: %s", string(msg.Value))
}
transferredCount++
log.Infof("Transferred %d messages", transferredCount)
}
}

func (s *Streamer) onShutdown() error {
log.Infof("Flushing local queued messages to kafka...")
s.producer.Flush(30 * 1000)
return nil
}
84 changes: 83 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,88 @@ func run(args []string) error {
importCmd.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
importCmd.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
importCmd.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol")
err = importCmd.MarkFlagRequired("file")
if err != nil {
return err
}

// Init stream command
var topicFrom string
var topicTo string
streamCmd := cobra.Command{
Use: "stream",
Run: func(cmd *cobra.Command, args []string) {
//TODO: Separate consumer & producer config
kafkaConsumerConfig := kafka_utils.Config{
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
GroupId: kafkaGroupID,
}
consumer, err := kafka_utils.NewConsumer(kafkaConsumerConfig)
if err != nil {
panic(err)
}
kafkaProducerConfig := kafka_utils.Config{
BootstrapServers: kafkaServers,
SecurityProtocol: kafkaSecurityProtocol,
SASLMechanism: kafkaSASKMechanism,
SASLUsername: kafkaUsername,
SASLPassword: kafkaPassword,
}
producer, err := kafka_utils.NewProducer(kafkaProducerConfig)
if err != nil {
panic(err)
}
queueBufferingMaxMessages := kafka_utils.DefaultQueueBufferingMaxMessages
if kafkaProducerConfig.QueueBufferingMaxMessages > 0 {
queueBufferingMaxMessages = kafkaProducerConfig.QueueBufferingMaxMessages
}
deliveryChan := make(chan kafka.Event, queueBufferingMaxMessages)
go func() { // Tricky: kafka require specific deliveryChan to use Flush function
for e := range deliveryChan {
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
panic(fmt.Sprintf("Failed to deliver message: %v\n", m.TopicPartition))
} else {
log.Debugf("Successfully produced record to topic %s partition [%d] @ offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
}
}()
maxWaitingSecondsForNewMessage := time.Duration(10) * time.Second
streamer := impl.NewStreamer(
consumer,
producer,
topicFrom,
topicTo,
deliveryChan,
impl.StreamerOptions{MaxWaitingTimeForNewMessage: &maxWaitingSecondsForNewMessage},
)
transferredCount, err := streamer.Run()
if err != nil {
panic(err)
}
log.Infof("Transferred %d messages from %s to %s", transferredCount, topicFrom, topicTo)
},
}
streamCmd.Flags().StringVar(&kafkaServers, "kafka-servers", "", "Kafka servers string")
streamCmd.Flags().StringVar(&kafkaUsername, "kafka-username", "", "Kafka username")
streamCmd.Flags().StringVar(&kafkaPassword, "kafka-password", "", "Kafka password")
streamCmd.Flags().StringVar(&kafkaSASKMechanism, "kafka-sasl-mechanism", "", "Kafka password")
streamCmd.Flags().StringVar(&kafkaSecurityProtocol, "kafka-security-protocol", "", "Kafka security protocol")
streamCmd.Flags().StringVar(&kafkaGroupID, "kafka-group-id", "", "Kafka consumer group ID")
streamCmd.Flags().StringVar(&topicFrom, "topic-from", "", "Source topic")
streamCmd.Flags().StringVar(&topicTo, "topic-to", "", "Destination topic")
streamCmd.Flags().IntVar(&maxWaitingSecondsForNewMessage, "max-waiting-seconds-for-new-message", 30, "Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever.")
streamCmd.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol")

err = streamCmd.MarkFlagRequired("topic-from")
if err != nil {
return err
}
err = streamCmd.MarkFlagRequired("topic-to")
if err != nil {
return err
}
Expand All @@ -192,6 +273,7 @@ func run(args []string) error {
&exportCmd,
&importCmd,
&countParquetNumberOfRowsCmd,
&streamCmd,
)

return rootCmd.Execute()
Expand Down
27 changes: 21 additions & 6 deletions pkg/kafka_utils/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,31 @@ package kafka_utils
import "github.com/confluentinc/confluent-kafka-go/kafka"

func NewConsumer(cfg Config) (*kafka.Consumer, error) {
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
config := &kafka.ConfigMap{
"bootstrap.servers": cfg.BootstrapServers,
"security.protocol": cfg.SecurityProtocol,
"sasl.mechanism": cfg.SASLMechanism,
"sasl.username": cfg.SASLUsername,
"sasl.password": cfg.SASLPassword,
"enable.auto.commit": false,
"auto.offset.reset": "earliest",
"group.id": cfg.GroupId,
})
}
if cfg.SecurityProtocol != "" && cfg.SASLMechanism != "" && cfg.SASLUsername != "" && cfg.SASLPassword != "" {
err := config.SetKey("security.protocol", cfg.SecurityProtocol)
if err != nil {
return nil, err
}
err = config.SetKey("sasl.mechanism", cfg.SASLMechanism)
if err != nil {
return nil, err
}
err = config.SetKey("sasl.username", cfg.SASLUsername)
if err != nil {
return nil, err
}
err = config.SetKey("sasl.password", cfg.SASLPassword)
if err != nil {
return nil, err
}
}
consumer, err := kafka.NewConsumer(config)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 2705e4d

Please sign in to comment.