Skip to content

Commit

Permalink
Merge pull request #13 from azarakovskiy/kafka-consumer-built-in
Browse files Browse the repository at this point in the history
Add proton internal consumer
  • Loading branch information
peterklijn authored Mar 25, 2022
2 parents 1fa78e3 + 30163b3 commit 4cb704d
Show file tree
Hide file tree
Showing 11 changed files with 792 additions and 27 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ jobs:
- name: Check out code into the Go module directory
uses: actions/checkout@master

- name: Set up Go (1.15)
- name: Set up Go (1.17)
uses: actions/setup-go@v1
with:
go-version: 1.15
go-version: 1.17

- name: Linter
run: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ jobs:
- name: Unshallow
run: git fetch --prune --unshallow

- name: Set up Go (1.15)
- name: Set up Go (1.17)
uses: actions/setup-go@v1
with:
go-version: 1.15
go-version: 1.17

- name: Run GoReleaser
uses: goreleaser/goreleaser-action@master
Expand Down
73 changes: 70 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ brew tap beatlabs/proton https://github.com/beatlabs/proton
brew install proton
```

## Usage
## Usage as a converter Protobuf to JSON

```shell script
Usage:
Expand All @@ -35,7 +35,7 @@ Flags:
Defaults to the first message type in the Proton file if not specified
```
## Examples
### Examples
Proto file from URL with input message as argument
```shell script
Expand All @@ -62,7 +62,7 @@ Multiple proto files from a producer with input messages piped
./testdata/producer.sh '--END--' | proton json -f ./testdata/addressbook.proto -m '--END--'
```
### Usage with Kafka consumers
### Piping data from Kafkacat
Because Proto bytes can contain newlines (`\n`) and often do,
we need to use a different marker to delimit the end of a message byte-stream and the beginning of the next.
Expand Down Expand Up @@ -91,3 +91,70 @@ stdbuf -o0 kcat -b my-broker:9092 -t my-topic -f '%s--END--' -o beginning | prot
If you don't have `stdbuf`, you can install it via `brew install coreutils`.
## Using proton as a standalone Kafka consumer
Proton can consume from Kafka directly. The syntax of all the parameters is kept as close as possible to the same from Kafkacat.
```shell
$ proton consume --help
consume from given topics
Usage:
proton consume [flags]
Flags:
-b, --broker string Broker URL to consume from
-f, --format string
A Kcat-like format string. Defaults to "%T: %s".
Format string tokens:
%s Message payload
%k Message key
%t Topic
%p Partition
%o Offset
%T Message timestamp (milliseconds since epoch UTC)
%Tf Message time formatted as RFC3339
\n \r \t Newlines, tab
Example:
-f 'Key: %k, Time: %Tf \nValue: %s' (default "%Tf: %s")
-h, --help help for consume
--key string Grep RegExp for a key value (default ".*")
-o, --offsets strings
Offset to start consuming from
s@<value> (timestamp in ms to start at)
e@<value> (timestamp in ms to stop at (not included))
--proto string A path to a proto file an URL to it
-t, --topic string A topic to consume from
-v, --verbose Whether to print out proton's debug messages
```
The minimal configuration to run Proton as a standalone consumer is
```shell
proton consume -b my-broker -t my-topic --proto ./my-schema.proto
```
This would consume all the messages from the topic since its start and use default formatting.
You can specify the start and/or the end offset timestamp in milliseconds. Both are optional.
```shell
proton consume -b my-broker -t my-topic --proto ./my-schema.proto -o s@1646218065015 -o e@1646218099197
```
If the end offset is set, proton will stop consuming once it's reached. Otherwise, it will keep consuming.
You can specify the format of the output.
```shell
$ proton consume -b my-broker -t my-topic --proto ./my-schema.proto -f "Time: %T \t %k\t%s"
# ...
Time: 1646218065015 key {"field1":"value1","field2":"value2"}
Time: 1646218099197 key {"field1":"value1","field2":"value2"}
# ...
```
Run `proton consume -h` to see all the available formatting options.
To filter out keys, you can use `--key <regexp>` option like in this example:
```shell
proton consume -b my-broker -t my-topic --proto ./my-schema.proto --key "my-key"
proton consume -b my-broker -t my-topic --proto ./my-schema.proto --key "my-k.*"
```
148 changes: 148 additions & 0 deletions cmd/consume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package cmd

import (
"bytes"
"context"
"log"
"os"
"os/signal"
"strconv"
"strings"

"github.com/Shopify/sarama"
"github.com/beatlabs/proton/v2/internal/consumer"
"github.com/beatlabs/proton/v2/internal/json"
"github.com/beatlabs/proton/v2/internal/output"
"github.com/beatlabs/proton/v2/internal/protoparser"
"github.com/spf13/cobra"
)

// consumeCmd represents the consume command
var consumeCmd = &cobra.Command{
Use: "consume",
Short: "consume from given topics",
Run: Run,
}

// ConsumeCfg is the config for everything this tool needs.
type ConsumeCfg struct {
consumerCfg consumer.Cfg
offsets []string
model string
format string
}

var consumeCfg = &ConsumeCfg{
consumerCfg: consumer.Cfg{},
}

func init() {
rootCmd.AddCommand(consumeCmd)

consumeCmd.Flags().StringVarP(&consumeCfg.consumerCfg.URL, "broker", "b", "", "Broker URL to consume from")
if consumeCmd.MarkFlagRequired("broker") != nil {
log.Fatal("you must specify a a broker URL using the `-b <url>` option")
}

consumeCmd.Flags().StringVarP(&consumeCfg.consumerCfg.Topic, "topic", "t", "", "A topic to consume from")
if consumeCmd.MarkFlagRequired("topic") != nil {
log.Fatal("you must specify a topic to consume using the `-t <topic>` option")
}

consumeCmd.Flags().StringVarP(&consumeCfg.model, "proto", "", "", "A path to a proto file an URL to it")
if consumeCmd.MarkFlagRequired("proto") != nil {
log.Fatal("you must specify a proto file using the `-m <path>` option")
}

consumeCmd.Flags().StringVarP(&consumeCfg.format, "format", "f", "%Tf: %s", `
A Kcat-like format string. Defaults to "%T: %s".
Format string tokens:
%s Message payload
%k Message key
%t Topic
%p Partition
%o Offset
%T Message timestamp (milliseconds since epoch UTC)
%Tf Message time formatted as RFC3339
\n \r \t Newlines, tab
Example:
-f 'Key: %k, Time: %Tf \nValue: %s'`)

consumeCmd.Flags().StringSliceVarP(&consumeCfg.offsets, "offsets", "o", []string{}, `
Offset to start consuming from
s@<value> (timestamp in ms to start at)
e@<value> (timestamp in ms to stop at (not included))
`)

consumeCmd.Flags().StringVarP(&consumeCfg.consumerCfg.KeyGrep, "key", "", ".*", "Grep RegExp for a key value")

consumeCmd.Flags().BoolVarP(&consumeCfg.consumerCfg.Verbose, "verbose", "v", false, "Whether to print out proton's debug messages")
}

// Run runs this whole thing.
func Run(cmd *cobra.Command, _ []string) {
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

protoParser, fileName, err := protoparser.New(ctx, consumeCfg.model)
if err != nil {
log.Fatal(err)
}

consumeCfg.consumerCfg.Start, consumeCfg.consumerCfg.End = parseOffsets(consumeCfg.offsets)

kafka, err := consumer.NewKafka(ctx, consumeCfg.consumerCfg,
&protoDecoder{json.Converter{
Parser: protoParser,
Filename: fileName,
}}, output.NewFormatterPrinter(consumeCfg.format, os.Stdout, os.Stderr))

if err != nil {
log.Fatal(err)
}

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

errCh := kafka.Run()

select {
case err := <-errCh:
if err != nil {
log.Fatal(err)
}
case _ = <-signals:
break
}
}

type protoDecoder struct {
json.Converter
}

// Decode uses the existing json decoder and adapts it to this consumer.
func (p *protoDecoder) Decode(rawData []byte) (string, error) {
stream, errCh := p.ConvertStream(bytes.NewReader(rawData))
select {
case msg := <-stream:
return string(msg), nil
case err := <-errCh:
return "", err
}
}

func parseOffsets(offsets []string) (int64, int64) {
return parseOffset("s@", offsets, sarama.OffsetOldest), parseOffset("e@", offsets, sarama.OffsetNewest)
}

func parseOffset(prefix string, offsets []string, defaultVal int64) int64 {
for _, offset := range offsets {
if strings.HasPrefix(offset, prefix) {
v, err := strconv.Atoi(offset[len(prefix):])
if err == nil {
return int64(v)
}
}
}
return defaultVal
}
58 changes: 58 additions & 0 deletions cmd/consume_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package cmd

import (
"testing"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
)

func TestParseOffsets(t *testing.T) {
tests := []struct {
name string
given []string
startTime, endTime int64
}{
{
name: "no offsets specified",
given: []string{},
startTime: sarama.OffsetOldest,
endTime: sarama.OffsetNewest,
},
{
name: "start offset specified",
given: []string{"s@24"},
startTime: 24,
endTime: sarama.OffsetNewest,
},
{
name: "end offset specified",
given: []string{"e@42"},
startTime: sarama.OffsetOldest,
endTime: 42,
},
{
name: "both offsets specified",
given: []string{"s@24", "e@42"},
startTime: 24,
endTime: 42,
},
{
name: "multiple offsets specified",
given: []string{"s@24", "e@42", "s@123", "e@321"},
startTime: 24,
endTime: 42,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// given
// when
r1, r2 := parseOffsets(test.given)

// then
assert.Equal(t, test.startTime, r1)
assert.Equal(t, test.endTime, r2)
})
}
}
43 changes: 41 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,53 @@
module github.com/beatlabs/proton/v2

go 1.15
go 1.17

require (
github.com/Shopify/sarama v1.32.0
github.com/golang/protobuf v1.4.1
github.com/jhump/protoreflect v1.7.0
github.com/mitchellh/go-homedir v1.1.0
github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
google.golang.org/protobuf v1.25.0
gopkg.in/h2non/gock.v1 v1.0.15
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.4.7 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.14.4 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/mitchellh/mapstructure v1.1.2 // indirect
github.com/pelletier/go-toml v1.2.0 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/spf13/afero v1.1.2 // indirect
github.com/spf13/cast v1.3.0 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
github.com/spf13/pflag v1.0.3 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
gopkg.in/ini.v1 v1.51.0 // indirect
gopkg.in/yaml.v2 v2.2.4 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
Loading

0 comments on commit 4cb704d

Please sign in to comment.