Skip to content

Commit

Permalink
Merge pull request #71 from batchcorp/dselans/improve-kafka-relay-thr…
Browse files Browse the repository at this point in the history
…oughput

Dselans/improve kafka relay throughput
  • Loading branch information
dselans authored Jan 29, 2021
2 parents 6c82758 + 6a15104 commit c3ddfee
Show file tree
Hide file tree
Showing 38 changed files with 877 additions and 263 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/master-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions/setup-go@v2
with:
go-version: '^1.15.4' # The Go version to download (if necessary) and use.
- name: Start up dependencies
run: docker-compose up -d
- name: Wait for dependencies to start up
Expand All @@ -17,12 +20,15 @@ jobs:
time: '30s'
- name: Master buld tests
run: |
make test
make test
functional:
name: Run functional tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '^1.15.4' # The Go version to download (if necessary) and use.
- name: Start up dependencies
run: docker-compose up -d
- name: Wait for dependencies to start up
Expand All @@ -35,4 +41,4 @@ jobs:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: |
make test/functional
make test/functional
8 changes: 7 additions & 1 deletion .github/workflows/pr-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '^1.15.4' # The Go version to download (if necessary) and use.
- name: Start up dependencies
run: docker-compose up -d
- name: Wait for dependencies to start up
Expand All @@ -20,6 +23,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '^1.15.4' # The Go version to download (if necessary) and use.
- name: Start up dependencies
run: docker-compose up -d
- name: Wait for dependencies to start up
Expand All @@ -32,4 +38,4 @@ jobs:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: |
make test/functional
make test/functional
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
/build
.DS*
test-assets/backend-data/*
start-*.sh
32 changes: 17 additions & 15 deletions ENV.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ Environment Variables
| `PLUMBER_RELAY_GRPC_TIMEOUT` | gRPC collector timeout | `10s` |
| `PLUMBER_RELAY_NUM_WORKERS` | Number of relay workers | `10` |
| `PLUMBER_RELAY_HTTP_LISTEN_ADDRESS` | Alternative listen address for local HTTP server | `:8080` |

| `PLUMBER_RELAY_BATCH_SIZE` | How many messages to batch before sending them to grpc-collector | `10` |
| `PLUMBER_STATS` | Display periodic consumer/producer stats | `false` |
| `PLUMBER_STATS_REPORT_INTERVAL` | Interval at which periodic stats are displayed | `5s` |

## SQS

Expand All @@ -25,7 +27,6 @@ Environment Variables
| `PLUMBER_RELAY_SQS_RECEIVE_REQUEST_ATTEMPT_ID` | An id to identify this read request by | `plumber` |
| `PLUMBER_RELAY_SQS_AUTO_DELETE` | Delete read/received messages | `false` |
| `PLUMBER_RELAY_SQS_WAIT_TIME_SECONDS` | Number of seconds to wait for messages (not used when using 'follow') |
|

## RabbitMQ

Expand All @@ -48,19 +49,20 @@ Environment Variables

| Environment Variable | Description | Default |
| --------------------- | ------------| ------- |
| `PLUMBER_RELAY_KAFKA_ADDRESS` | Destination host address | `localhost:9092` |
| `PLUMBER_RELAY_KAFKA_TOPIC` | Topic to read message(s) from | **REQUIRED** |
| `PLUMBER_RELAY_KAFKA_TIMEOUT` | Connect timeout | `10s` |
| `PLUMBER_RELAY_KAFKA_INSECURE_TLS` | Use insecure TLS (ie. do not verify cert) | `false` |
| `PLUMBER_RELAY_KAFKA_USERNAME` | SASL Username | |
| `PLUMBER_RELAY_KAFKA_PASSWORD` | SASL Password. If omitted, you will be prompted for the password | |
| `PLUMBER_RELAY_KAFKA_SASL_TYPE` | SASL Authentication type (plain or scram) | `scram` |
| `PLUMBER_RELAY_GROUP_ID` | Specify a specific group-id to use when reading from kafka | `plumber` |
| `PLUMBER_RELAY_MAX_WAIT` | How long to wait for new data when reading batches of messages | `1s` |
| `PLUMBER_RELAY_MIN_BYTES` | Minimum number of bytes to fetch in a single kafka request (throughput optimization) | `1` |
| `PLUMBER_RELAY_MAX_BYTES` | Maximum number of bytes to fetch in a single kafka request (throughput optimization) | `1` |
| `PLUMBER_RELAY_QUEUE_CAPACITY` | Internal queue capacity (throughput optimization) | `1` |
| `PLUMBER_RELAY_REBALANCE_TIMEOUT` | How long a coordinator will wait for member joins as part of a rebalance | `0` |
| `PLUMBER_RELAY_KAFKA_ADDRESS` | Destination host address | `localhost:9092` |
| `PLUMBER_RELAY_KAFKA_TOPIC` | Topic to read message(s) from | **REQUIRED** |
| `PLUMBER_RELAY_KAFKA_TIMEOUT` | Connect timeout | `10s` |
| `PLUMBER_RELAY_KAFKA_INSECURE_TLS` | Use insecure TLS (ie. do not verify cert) | `false` |
| `PLUMBER_RELAY_KAFKA_USERNAME` | SASL Username | |
| `PLUMBER_RELAY_KAFKA_PASSWORD` | SASL Password. If omitted, you will be prompted for the password | |
| `PLUMBER_RELAY_KAFKA_SASL_TYPE` | SASL Authentication type (plain or scram) | `scram` |
| `PLUMBER_RELAY_KAFKA_GROUP_ID` | Specify a specific group-id to use when reading from kafka | `plumber` |
| `PLUMBER_RELAY_KAFKA_MAX_WAIT` | How long to wait for new data when reading batches of messages | `1s` |
| `PLUMBER_RELAY_KAFKA_MIN_BYTES` | Minimum number of bytes to fetch in a single kafka request (throughput optimization) | `1` |
| `PLUMBER_RELAY_KAFKA_MAX_BYTES` | Maximum number of bytes to fetch in a single kafka request (throughput optimization) | `1` |
| `PLUMBER_RELAY_KAFKA_QUEUE_CAPACITY` | Internal queue capacity (throughput optimization) | `1` |
| `PLUMBER_RELAY_KAFKA_REBALANCE_TIMEOUT` | How long a coordinator will wait for member joins as part of a rebalance | `0` |
| `PLUMBER_RELAY_KAFKA_COMMIT_INTERVAL` | How often to commit offsets to broker (0 = synchronous) | `5s` |

**NOTE**: For _Confluent-hosted_ Kafka, you MUST set:

Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ $ plumber relay --help

## Features

* Dynamic protobuf encode & decode
* Gzip decompress
* Dynamic protobuf & avro encode & decode
* Gzip compress & decompress
* `--follow` support (ie. `tail -f`)
* Relay and archive all messaging system data
* Observe, relay and archive messaging data
* Support for **most** messaging systems
* Single-binary, zero-config, easy-install

## Hmm, what is this Batch thing?
Expand Down
4 changes: 4 additions & 0 deletions backends/aws-sqs/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/batchcorp/plumber/backends/aws-sqs/types"
"github.com/batchcorp/plumber/cli"
"github.com/batchcorp/plumber/relay"
"github.com/batchcorp/plumber/stats"
)

type Relayer struct {
Expand All @@ -33,6 +34,7 @@ func Relay(opts *cli.Options) error {
Timeout: opts.RelayGRPCTimeout,
RelayCh: make(chan interface{}, 1),
DisableTLS: opts.RelayGRPCDisableTLS,
BatchSize: opts.RelayBatchSize,
}

grpcRelayer, err := relay.New(relayCfg)
Expand Down Expand Up @@ -104,6 +106,8 @@ func (r *Relayer) Relay() error {

// Send message(s) to relayer
for _, v := range msgResult.Messages {
stats.Incr("sqs-relay-consumer", 1)

r.log.Debug("Writing message to relay channel")

r.RelayCh <- &types.RelayMessage{
Expand Down
3 changes: 3 additions & 0 deletions backends/azure/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/batchcorp/plumber/backends/azure/types"
"github.com/batchcorp/plumber/cli"
"github.com/batchcorp/plumber/relay"
"github.com/batchcorp/plumber/stats"
)

type Relayer struct {
Expand Down Expand Up @@ -142,6 +143,8 @@ func (r *Relayer) Relay() error {
msg.UserProperties["plumber_subscription"] = r.Options.Azure.Subscription
}

stats.Incr("azure-relay-consumer", 1)

r.RelayCh <- &types.RelayMessage{
Value: msg,
}
Expand Down
4 changes: 3 additions & 1 deletion backends/gcp-pubsub/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/batchcorp/plumber/backends/gcp-pubsub/types"
"github.com/batchcorp/plumber/cli"
"github.com/batchcorp/plumber/relay"
"github.com/batchcorp/plumber/stats"
)

type Relayer struct {
Expand Down Expand Up @@ -85,12 +86,13 @@ func (r *Relayer) Relay() error {
defer msg.Ack()
}

stats.Incr("gcp-relay-consumer", 1)

r.log.Debug("Writing message to relay channel")

r.RelayCh <- &types.RelayMessage{
Value: msg,
}

}

for {
Expand Down
3 changes: 1 addition & 2 deletions backends/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
)

const (
DefaultMaxBytes = 1048576 // 1MB
DefaultMaxWait = 50 * time.Millisecond
DefaultBatchSize = 1
)

Expand Down Expand Up @@ -81,6 +79,7 @@ func NewReader(opts *cli.Options) (*KafkaReader, error) {
r := skafka.NewReader(skafka.ReaderConfig{
Brokers: []string{opts.Kafka.Address},
GroupID: opts.Kafka.ReadGroupId,
CommitInterval: opts.Kafka.CommitInterval,
Topic: opts.Kafka.Topic,
Dialer: dialer,
MaxWait: opts.Kafka.MaxWait,
Expand Down
7 changes: 6 additions & 1 deletion backends/kafka/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/batchcorp/plumber/backends/kafka/types"
"github.com/batchcorp/plumber/cli"
"github.com/batchcorp/plumber/relay"
"github.com/batchcorp/plumber/stats"
)

type Relayer struct {
Expand Down Expand Up @@ -102,7 +103,11 @@ func (r *Relayer) Relay() error {
r.log.Errorf("Unable to read message: %s", err)
continue
}
r.log.Infof("Writing Kafka message to relay channel: %s", msg.Value)

stats.Incr("kafka-relay-consumer", 1)

r.log.Debugf("Writing Kafka message to relay channel: %s", msg.Value)

r.RelayCh <- &types.RelayMessage{
Value: &msg,
Options: &types.RelayMessageOptions{},
Expand Down
6 changes: 6 additions & 0 deletions backends/rabbitmq/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package rabbitmq
import (
"context"
"github.com/batchcorp/plumber/backends/rabbitmq/types"
"github.com/batchcorp/plumber/stats"

"github.com/batchcorp/rabbit"
"github.com/jhump/protoreflect/desc"
"github.com/pkg/errors"
Expand Down Expand Up @@ -119,7 +121,11 @@ func (r *Relayer) Relay() error {
// this will also prevent log spam if a queue goes missing
return nil
}

stats.Incr("rabbit-relay-consumer", 1)

r.log.Debugf("Writing RabbitMQ message to relay channel: %+v", msg)

r.RelayCh <- &types.RelayMessage{
Value: &msg,
Options: &types.RelayMessageOptions{},
Expand Down
37 changes: 28 additions & 9 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
)

const (
DefaultGRPCAddress = "grpc-collector.batch.sh:9000"
DefaultHTTPListenAddress = ":8080"
DefaultGRPCTimeout = "10s"
DefaultNumWorkers = "10"
DefaultGRPCAddress = "grpc-collector.batch.sh:9000"
DefaultHTTPListenAddress = ":8080"
DefaultGRPCTimeout = "10s"
DefaultNumWorkers = "10"
DefaultStatsReportInterval = "5s"
DefaultBatchSize = "10"
)

var (
Expand All @@ -25,11 +27,13 @@ var (

type Options struct {
// Global
Debug bool
Quiet bool
Action string
Version string
Backend string
Debug bool
Quiet bool
Stats bool
StatsReportInterval time.Duration
Action string
Version string
Backend string

// Serializers
AvroSchemaFile string
Expand All @@ -42,6 +46,7 @@ type Options struct {
RelayNumWorkers int
RelayGRPCTimeout time.Duration
RelayGRPCDisableTLS bool
RelayBatchSize int

// Shared read flags
ReadProtobufRootMessage string
Expand Down Expand Up @@ -97,6 +102,15 @@ func Handle(cliArgs []string) (string, *Options, error) {
Short('q').
BoolVar(&opts.Quiet)

app.Flag("stats", "Display periodic read/write/relay stats").
Envar("PLUMBER_STATS").
BoolVar(&opts.Stats)

app.Flag("stats-report-interval", "Interval at which periodic stats are displayed").
Envar("PLUMBER_STATS_REPORT_INTERVAL").
Default(DefaultStatsReportInterval).
DurationVar(&opts.StatsReportInterval)

// Specific actions
readCmd := app.Command("read", "Read message(s) from messaging system")
writeCmd := app.Command("write", "Write message(s) to messaging system")
Expand Down Expand Up @@ -238,6 +252,11 @@ func HandleRelayFlags(relayCmd *kingpin.CmdClause, opts *Options) {
Default(DefaultHTTPListenAddress).
Envar("PLUMBER_RELAY_HTTP_LISTEN_ADDRESS").
StringVar(&opts.RelayHTTPListenAddress)

relayCmd.Flag("batch-size", "How many messages to batch before sending them to grpc-collector").
Default(DefaultBatchSize).
Envar("PLUMBER_RELAY_BATCH_SIZE").
IntVar(&opts.RelayBatchSize)
}

func ValidateProtobufOptions(dirs []string, rootMessage string) error {
Expand Down
Loading

0 comments on commit c3ddfee

Please sign in to comment.