Skip to content

Commit

Permalink
Merge pull request #327 from batchcorp/dselans/rabbitmq-read-filter
Browse files Browse the repository at this point in the history
added ability to exclude read messages from rabbitmq based on routing key regex
  • Loading branch information
blinktag committed Nov 30, 2022
2 parents 5710006 + 7b91d5a commit e8ce635
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 106 deletions.
19 changes: 19 additions & 0 deletions backends/rabbitmq/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"regexp"
"time"

"github.com/batchcorp/plumber/validate"
Expand All @@ -22,6 +23,17 @@ func (r *RabbitMQ) Read(ctx context.Context, readOpts *opts.ReadOptions, results
return errors.Wrap(err, "unable to validate read config")
}

var excludeRegexp *regexp.Regexp

if readOpts.Rabbit.Args.ExcludeBindingKeyRegex != "" {
var err error

excludeRegexp, err = regexp.Compile(readOpts.Rabbit.Args.ExcludeBindingKeyRegex)
if err != nil {
return errors.Wrap(err, "unable to compile exclude regex")
}
}

// Check if nil to allow unit testing injection into struct
if r.client == nil {
consumer, err := r.newRabbitForRead(readOpts.Rabbit.Args)
Expand All @@ -41,6 +53,13 @@ func (r *RabbitMQ) Read(ctx context.Context, readOpts *opts.ReadOptions, results
r.log.Info("Listening for messages...")

go r.client.Consume(ctx, errCh, func(msg amqp.Delivery) error {
if excludeRegexp != nil && excludeRegexp.Match([]byte(msg.RoutingKey)) {
r.log.Debugf("consumed message for routing key '%s' matches filter '%s' - skipping",
msg.RoutingKey, readOpts.Rabbit.Args.ExcludeBindingKeyRegex)

return nil
}

count++

serializedMsg, err := json.Marshal(msg)
Expand Down
3 changes: 2 additions & 1 deletion backends/rabbitmq/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"

"github.com/batchcorp/rabbit"

"github.com/batchcorp/plumber/backends/rabbitmq/rabbitfakes"
"github.com/batchcorp/plumber/validate"
"github.com/batchcorp/rabbit"
)

var _ = Describe("RabbitMQ Backend", func() {
Expand Down
18 changes: 18 additions & 0 deletions backends/rabbitmq/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rabbitmq

import (
"context"
"regexp"
"time"

"github.com/pkg/errors"
Expand All @@ -21,6 +22,17 @@ func (r *RabbitMQ) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rela
return errors.Wrap(err, "unable to verify options")
}

var excludeRegexp *regexp.Regexp

if relayOpts.Rabbit.Args.ExcludeBindingKeyRegex != "" {
var err error

excludeRegexp, err = regexp.Compile(relayOpts.Rabbit.Args.ExcludeBindingKeyRegex)
if err != nil {
return errors.Wrap(err, "unable to compile exclude regex")
}
}

// Check if nil to allow unit testing injection into struct
if r.client == nil {
consumer, err := r.newRabbitForRead(relayOpts.Rabbit.Args)
Expand All @@ -36,6 +48,12 @@ func (r *RabbitMQ) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rela
errCh := make(chan *rabbit.ConsumeError)

go r.client.Consume(ctx, errCh, func(msg amqp.Delivery) error {
if excludeRegexp != nil && excludeRegexp.Match([]byte(msg.RoutingKey)) {
r.log.Debugf("consumed message for routing key '%s' matches filter '%s' - skipping",
msg.RoutingKey, relayOpts.Rabbit.Args.ExcludeBindingKeyRegex)

return nil
}

if msg.Body == nil {
// Ignore empty messages
Expand Down
1 change: 1 addition & 0 deletions docs/env.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@
| PLUMBER_RELAY_RABBIT_AUTOACK | Automatically acknowledge receipt of read/received messages | true | false |
| PLUMBER_RELAY_CONSUMER_TAG | How to identify the consumer to RabbitMQ | plumber | false |
| PLUMBER_RELAY_RABBIT_QUEUE_AUTO_DELETE | Whether to auto-delete the queue after plumber has disconnected | true | false |
| PLUMBER_RELAY_RABBIT_BINDING_KEY_EXCLUDE_REGEX | Exclude messages with routing key matching regex | | false |

### Redis PubSub

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/batchcorp/kong v0.2.17-batch-fix
github.com/batchcorp/natty v0.0.16
github.com/batchcorp/pgoutput v0.3.2
github.com/batchcorp/plumber-schemas v0.0.170
github.com/batchcorp/plumber-schemas v0.0.172
github.com/batchcorp/rabbit v0.1.17
github.com/batchcorp/thrifty v0.0.10
github.com/dustin/go-humanize v1.0.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ github.com/batchcorp/plumber-schemas v0.0.168 h1:3rfTdNPGDoTqw+OOkoFpRMjufuBzVKa
github.com/batchcorp/plumber-schemas v0.0.168/go.mod h1:KxBXjHwZhPZoJryfv0c+aGFntx+rw2euYOqcNgqQlik=
github.com/batchcorp/plumber-schemas v0.0.170 h1:Pq9dYvCzUBB4qkmOoRVvGsU/A99vhsnofH3jsJaFl1A=
github.com/batchcorp/plumber-schemas v0.0.170/go.mod h1:KxBXjHwZhPZoJryfv0c+aGFntx+rw2euYOqcNgqQlik=
github.com/batchcorp/plumber-schemas v0.0.171 h1:7jBUcpjMEbUq9HuKxa35Gp/hy+c4TVb3gKoDQUCLl7s=
github.com/batchcorp/plumber-schemas v0.0.171/go.mod h1:KxBXjHwZhPZoJryfv0c+aGFntx+rw2euYOqcNgqQlik=
github.com/batchcorp/plumber-schemas v0.0.172 h1:0aS9EBpjzyTfFPMmTx30eu3mGGrYPyTFpalgnNXRwFw=
github.com/batchcorp/plumber-schemas v0.0.172/go.mod h1:KxBXjHwZhPZoJryfv0c+aGFntx+rw2euYOqcNgqQlik=
github.com/batchcorp/plz v0.9.2 h1:bPqb+sn7OUrpHjeTEI9YO4BJS9IQ7AstDDz2gn+tcn8=
github.com/batchcorp/plz v0.9.2/go.mod h1:3gacX+hQo+xvl0vtLqCMufzxuNCwt4geAVOMt2LQYfE=
github.com/batchcorp/rabbit v0.1.17 h1:dui1W7FLTrNxyVlDN+G+6d8LXz8HBhVAcUethXql9vQ=
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e8ce635

Please sign in to comment.