Skip to content

Commit

Permalink
Kafka relay support
Browse files Browse the repository at this point in the history
  • Loading branch information
blinktag authored Jan 11, 2021
2 parents ca79c74 + 218b6fc commit edb4fda
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 17 deletions.
108 changes: 106 additions & 2 deletions backends/kafka/relay.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,111 @@
package kafka

import "github.com/batchcorp/plumber/cli"
import (
"context"

"github.com/jhump/protoreflect/desc"
"github.com/pkg/errors"
"github.com/relistan/go-director"
"github.com/sirupsen/logrus"

"github.com/batchcorp/plumber/api"
"github.com/batchcorp/plumber/backends/kafka/types"
"github.com/batchcorp/plumber/cli"
"github.com/batchcorp/plumber/relay"
)

type Relayer struct {
Options *cli.Options
MsgDesc *desc.MessageDescriptor
RelayCh chan interface{}
log *logrus.Entry
Looper *director.FreeLooper
DefaultContext context.Context
}

type IKafkaRelayer interface {
Relay() error
}

var (
errMissingTopic = errors.New("You must specify a topic")
)

// Relay sets up a new Kafka relayer, starts GRPC workers and the API server
func Relay(opts *cli.Options) error {
panic("not implemented")
if err := validateRelayOptions(opts); err != nil {
return errors.Wrap(err, "unable to verify options")
}

// Create new relayer instance (+ validate token & gRPC address)
relayCfg := &relay.Config{
Token: opts.RelayToken,
GRPCAddress: opts.RelayGRPCAddress,
NumWorkers: opts.RelayNumWorkers,
Timeout: opts.RelayGRPCTimeout,
RelayCh: make(chan interface{}, 1),
DisableTLS: opts.RelayGRPCDisableTLS,
}

grpcRelayer, err := relay.New(relayCfg)
if err != nil {
return errors.Wrap(err, "unable to create new gRPC relayer")
}

// Launch HTTP server
go func() {
if err := api.Start(opts.RelayHTTPListenAddress, opts.Version); err != nil {
logrus.Fatalf("unable to start API server: %s", err)
}
}()

if err := grpcRelayer.StartWorkers(); err != nil {
return errors.Wrap(err, "unable to start gRPC relay workers")
}

r := &Relayer{
Options: opts,
RelayCh: relayCfg.RelayCh,
log: logrus.WithField("pkg", "kafka/relay"),
Looper: director.NewFreeLooper(director.FOREVER, make(chan error)),
DefaultContext: context.Background(),
}

return r.Relay()
}

// validateRelayOptions ensures all required CLI options are present before initializing relay mode
func validateRelayOptions(opts *cli.Options) error {
if opts.Kafka.Topic == "" {
return errMissingTopic
}
return nil
}

// Relay reads messages from Kafka and sends them to RelayCh which is then read by relay.Run()
func (r *Relayer) Relay() error {
r.log.Infof("Relaying Kafka messages from '%s' topic -> '%s'",
r.Options.Kafka.Topic, r.Options.RelayGRPCAddress)

r.log.Infof("HTTP server listening on '%s'", r.Options.RelayHTTPListenAddress)

reader, err := NewReader(r.Options)
if err != nil {
return err
}
defer reader.Reader.Close()
defer reader.Conn.Close()

for {
msg, err := reader.Reader.ReadMessage(r.DefaultContext)
if err != nil {
r.log.Errorf("Unable to read message: %s", err)
continue
}
r.log.Infof("Writing Kafka message to relay channel: %s", msg.Value)
r.RelayCh <- &types.RelayMessage{
Value: &msg,
Options: &types.RelayMessageOptions{},
}
}
}
15 changes: 15 additions & 0 deletions backends/kafka/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package types

import (
"github.com/segmentio/kafka-go"
)

// RelayMessage encapsulates a kafka message that is read by relay.Run()
type RelayMessage struct {
Value *kafka.Message
Options *RelayMessageOptions
}

// RelayMessageOptions contains any additional options necessary for processing of Kafka messages by the relayer
type RelayMessageOptions struct {
}
29 changes: 21 additions & 8 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cli

import (
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -93,13 +94,25 @@ func Handle(cliArgs []string) (string, *Options, error) {
writeCmd := app.Command("write", "Write message(s) to messaging system")
relayCmd := app.Command("relay", "Relay message(s) from messaging system to Batch")

HandleRelayFlags(relayCmd, opts)
HandleKafkaFlags(readCmd, writeCmd, opts)
HandleRabbitFlags(readCmd, writeCmd, relayCmd, opts)
HandleGCPPubSubFlags(readCmd, writeCmd, opts)
HandleMQTTFlags(readCmd, writeCmd, opts)
HandleAWSSQSFlags(readCmd, writeCmd, relayCmd, opts)
HandleActiveMqFlags(readCmd, writeCmd, opts)
switch os.Getenv("PLUMBER_RELAY_TYPE") {
case "kafka":
HandleRelayFlags(relayCmd, opts)
HandleKafkaFlags(readCmd, writeCmd, relayCmd, opts)
case "rabbit":
HandleRelayFlags(relayCmd, opts)
HandleRabbitFlags(readCmd, writeCmd, relayCmd, opts)
case "aws-sqs":
HandleRelayFlags(relayCmd, opts)
HandleAWSSQSFlags(readCmd, writeCmd, relayCmd, opts)
default:
HandleRelayFlags(relayCmd, opts)
HandleKafkaFlags(readCmd, writeCmd, relayCmd, opts)
HandleRabbitFlags(readCmd, writeCmd, relayCmd, opts)
HandleGCPPubSubFlags(readCmd, writeCmd, opts)
HandleMQTTFlags(readCmd, writeCmd, opts)
HandleAWSSQSFlags(readCmd, writeCmd, relayCmd, opts)
HandleActiveMqFlags(readCmd, writeCmd, opts)
}

HandleGlobalFlags(readCmd, opts)
HandleGlobalReadFlags(readCmd, opts)
Expand Down Expand Up @@ -173,7 +186,7 @@ func HandleGlobalFlags(cmd *kingpin.CmdClause, opts *Options) {
func HandleRelayFlags(relayCmd *kingpin.CmdClause, opts *Options) {
relayCmd.Flag("type", "Type of collector to use. Ex: rabbit, kafka, aws-sqs").
Envar("PLUMBER_RELAY_TYPE").
EnumVar(&opts.RelayType, "aws-sqs", "rabbit")
EnumVar(&opts.RelayType, "aws-sqs", "rabbit", "kafka")

relayCmd.Flag("token", "Collection token to use when sending data to Batch").
Required().
Expand Down
45 changes: 38 additions & 7 deletions cli/kafka.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cli

import (
"os"
"time"

"gopkg.in/alecthomas/kingpin.v2"
Expand Down Expand Up @@ -28,7 +29,7 @@ type KafkaOptions struct {
WriteKey string
}

func HandleKafkaFlags(readCmd, writeCmd *kingpin.CmdClause, opts *Options) {
func HandleKafkaFlags(readCmd, writeCmd, relayCmd *kingpin.CmdClause, opts *Options) {
rc := readCmd.Command("kafka", "Kafka message system")

addSharedKafkaFlags(rc, opts)
Expand All @@ -39,17 +40,47 @@ func HandleKafkaFlags(readCmd, writeCmd *kingpin.CmdClause, opts *Options) {

addSharedKafkaFlags(wc, opts)
addWriteKafkaFlags(wc, opts)

// If PLUMBER_RELAY_TYPE is set, use env vars, otherwise use CLI flags
relayType := os.Getenv("PLUMBER_RELAY_TYPE")

var rec *kingpin.CmdClause

if relayType != "" {
rec = relayCmd
} else {
rec = relayCmd.Command("kafka", "Kafka message system")
}

addSharedKafkaFlags(rec, opts)
addReadKafkaFlags(rec, opts)
}

func addSharedKafkaFlags(cmd *kingpin.CmdClause, opts *Options) {
cmd.Flag("address", "Destination host address").Default("localhost:9092").StringVar(&opts.Kafka.Address)
cmd.Flag("topic", "Topic to read message(s) from").Required().StringVar(&opts.Kafka.Topic)
cmd.Flag("address", "Destination host address").
Default("localhost:9092").
Envar("PLUMBER_RELAY_KAFKA_ADDRESS").
StringVar(&opts.Kafka.Address)
cmd.Flag("topic", "Topic to read message(s) from").
Required().
Envar("PLUMBER_RELAY_KAFKA_TOPIC").
StringVar(&opts.Kafka.Topic)
cmd.Flag("timeout", "Connect timeout").Default(KafkaDefaultConnectTimeout).
Envar("PLUMBER_RELAY_KAFKA_TIMEOUT").
DurationVar(&opts.Kafka.Timeout)
cmd.Flag("insecure-tls", "Use insecure TLS").BoolVar(&opts.Kafka.InsecureTLS)
cmd.Flag("username", "SASL Username").StringVar(&opts.Kafka.Username)
cmd.Flag("password", "SASL Password. If omitted, you will be prompted for the password").StringVar(&opts.Kafka.Password)
cmd.Flag("auth-type", "SASL Authentication type (plain or scram)").Default("scram").StringVar(&opts.Kafka.AuthenticationType)
cmd.Flag("insecure-tls", "Use insecure TLS").
Envar("PLUMBER_RELAY_KAFKA_INSECURE_TLS").
BoolVar(&opts.Kafka.InsecureTLS)
cmd.Flag("username", "SASL Username").
Envar("PLUMBER_RELAY_KAFKA_USERNAME").
StringVar(&opts.Kafka.Username)
cmd.Flag("password", "SASL Password. If omitted, you will be prompted for the password").
Envar("PLUMBER_RELAY_KAFKA_PASSWORD").
StringVar(&opts.Kafka.Password)
cmd.Flag("auth-type", "SASL Authentication type (plain or scram)").
Default("scram").
Envar("PLUMBER_RELAY_KAFKA_SASL_TYPE").
StringVar(&opts.Kafka.AuthenticationType)

}

Expand Down
65 changes: 65 additions & 0 deletions relay/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package relay

import (
"context"
"time"

"github.com/batchcorp/schemas/build/go/events/records"
"github.com/batchcorp/schemas/build/go/services"
"github.com/pkg/errors"
"github.com/segmentio/kafka-go"
"google.golang.org/grpc"

"github.com/batchcorp/plumber/backends/kafka/types"
)

var (
errMissingMessage = errors.New("msg cannot be nil")
errMissingMessageValue = errors.New("msg.Value cannot be nil")
)

// handleKafka sends a Kafka relay message to the GRPC server
func (r *Relay) handleKafka(ctx context.Context, conn *grpc.ClientConn, msg *types.RelayMessage) error {
if err := r.validateKafkaRelayMessage(msg); err != nil {
return errors.Wrap(err, "unable to validate kafka relay message")
}

kafkaRecord := convertKafkaMessageToProtobufRecord(msg.Value)

client := services.NewGRPCCollectorClient(conn)

if _, err := client.AddKafkaRecord(ctx, &services.KafkaSinkRecordRequest{
Records: []*records.KafkaSinkRecord{kafkaRecord},
}); err != nil {
r.log.Errorf("%+v", kafkaRecord)
return errors.Wrap(err, "unable to complete AddKafkaRecord call")
}
r.log.Debug("successfully handled kafka message")
return nil
}

// validateKafkaRelayMessage ensures all necessary values are present for a Kafka relay message
func (r *Relay) validateKafkaRelayMessage(msg *types.RelayMessage) error {
if msg == nil {
return errMissingMessage
}

if msg.Value == nil {
return errMissingMessageValue
}

return nil
}

// convertKafkaMessageToProtobufRecord creates a records.KafkaSinkRecord from a kafka.Message which can then
// be sent to the GRPC server
func convertKafkaMessageToProtobufRecord(msg *kafka.Message) *records.KafkaSinkRecord {
return &records.KafkaSinkRecord{
Topic: msg.Topic,
Key: msg.Key,
Value: msg.Value,
Timestamp: time.Now().UTC().UnixNano(),
Offset: msg.Offset,
Partition: int32(msg.Partition),
}
}
4 changes: 4 additions & 0 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"google.golang.org/grpc/metadata"

sqsTypes "github.com/batchcorp/plumber/backends/aws-sqs/types"
kafkatypes "github.com/batchcorp/plumber/backends/kafka/types"
rabbitTypes "github.com/batchcorp/plumber/backends/rabbitmq/types"
)

Expand Down Expand Up @@ -169,6 +170,9 @@ func (r *Relay) Run(id int, conn *grpc.ClientConn, ctx context.Context) {
case *rabbitTypes.RelayMessage:
r.log.Debugf("Run() received rabbit message %+v", v)
err = r.handleRabbit(ctx, conn, v)
case *kafkatypes.RelayMessage:
r.log.Debugf("Run() received kafka message %+v", v)
err = r.handleKafka(ctx, conn, v)
default:
r.log.WithField("type", v).Error("received unknown message type - skipping")
continue
Expand Down

0 comments on commit edb4fda

Please sign in to comment.