Skip to content

Commit

Permalink
refactor to unify predicate handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado committed Aug 24, 2024
1 parent c09ebb2 commit de5fca1
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 94 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -848,13 +848,15 @@ command, the following context is set:

In the `sub` and `tap` commands, the following bindings are available:

* the current received message is bound to the variable [msg](#message-type),
* the current received message is bound to the variable [rt_msg](#message-type),
which allows access to the message-metadata and the body
* the current count of messages received that passed the filter is bound to
`rt_count`
* Helper function are provided for accessing the message body:
* the `toStr` function converts a byte buffer into a string, e.g. `let
b=toJSON(toStr(msg.Body))`
* the `gunzip` function decompresses the given byte buffer `let
b=toJSON(toStr(gunzip(msg.Body)))`, allowing to inspect a compressed body
* the `rt_toStr` function converts a byte buffer into a string, e.g. `let
b=toJSON(rt_toStr(rt_msg.Body))`
* the `rt_gunzip` function decompresses the given byte buffer `let
b=toJSON(rt_toStr(rt_gunzip(rt_msg.Body)))`, allowing to inspect a compressed body

##### Examples

Expand All @@ -869,9 +871,9 @@ broker to be used, e.g. `http://guest:guest@localhost:15672/api`).
before, but consider only exchanges of type `topic`.
* `rabtap info --filter "queue.Consumers > 0" --omit --stats --consumers` - print
all queues with at least one consume
* `rabtap sub JDQ --filter="msg.RoutingKey == 'test'"` - print only messages that
* `rabtap sub JDQ --filter="rt_msg.RoutingKey == 'test'"` - print only messages that
were sent with the routing key `test`.
* `rabtap sub JDQ --filter="let b=fromJSON(toStr(gunzip(msg.Body))); b.Name == 'JAN'"` -
* `rabtap sub JDQ --filter="let b=fromJSON(rt_toStr(rt_gunzip(rt_msg.Body))); b.Name == 'JAN'"` -
print only messages that have `.Name == "JAN"` in their gzipped payload,
interpreted as `JSON`

Expand Down
4 changes: 2 additions & 2 deletions cmd/rabtap/cmd_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type CmdSubscribeArg struct {
queue string
tlsConfig *tls.Config
messageReceiveFunc MessageReceiveFunc
messageReceiveLoopPred MessagePred
filterPred MessagePred
messageReceiveLoopPred Predicate
filterPred Predicate
reject bool
requeue bool
args rabtap.KeyValueMap
Expand Down
13 changes: 4 additions & 9 deletions cmd/rabtap/cmd_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"io"
"net/url"
"os"
"syscall"
"testing"
"time"

Expand All @@ -37,7 +36,7 @@ func TestCmdSubFailsEarlyWhenBrokerIsNotAvailable(t *testing.T) {
queue: "queue",
tlsConfig: &tls.Config{},
messageReceiveFunc: func(rabtap.TapMessage) error { return nil },
messageReceiveLoopPred: func(rabtap.TapMessage) (bool, error) { return false, nil },
messageReceiveLoopPred: &constantPred{false},
timeout: time.Second * 10,
})
done <- true
Expand Down Expand Up @@ -83,8 +82,8 @@ func TestCmdSub(t *testing.T) {
queue: testQueue,
tlsConfig: tlsConfig,
messageReceiveFunc: receiveFunc,
filterPred: func(rabtap.TapMessage) (bool, error) { return true, nil },
messageReceiveLoopPred: func(rabtap.TapMessage) (bool, error) { return false, nil },
filterPred: &constantPred{true},
messageReceiveLoopPred: &constantPred{false},
timeout: time.Second * 10,
})

Expand Down Expand Up @@ -150,16 +149,12 @@ func TestCmdSubIntegration(t *testing.T) {
})
require.Nil(t, err)

go func() {
time.Sleep(time.Second * 2)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()

oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{"rabtap", "sub",
"--uri", amqpURL.String(),
testQueue,
"--limit=1",
"--format=raw",
"--no-color"}
output := testcommon.CaptureOutput(main)
Expand Down
4 changes: 2 additions & 2 deletions cmd/rabtap/cmd_tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type CmdTapArg struct {
tapConfig []rabtap.TapConfiguration
tlsConfig *tls.Config
messageReceiveFunc MessageReceiveFunc
termPred MessagePred
filterPred MessagePred
termPred Predicate
filterPred Predicate
timeout time.Duration
}

Expand Down
14 changes: 5 additions & 9 deletions cmd/rabtap/cmd_tap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"crypto/tls"
"os"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -45,15 +44,12 @@ func TestCmdTap(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

filterPred := func(rabtap.TapMessage) (bool, error) { return true, nil }
termPred := func(rabtap.TapMessage) (bool, error) { return true, nil }

// when
go cmdTap(ctx, CmdTapArg{tapConfig: tapConfig,
tlsConfig: &tls.Config{},
messageReceiveFunc: receiveFunc,
filterPred: filterPred,
termPred: termPred,
filterPred: &constantPred{true},
termPred: &constantPred{false},
timeout: time.Second * 10})

time.Sleep(time.Second * 1)
Expand Down Expand Up @@ -84,8 +80,9 @@ func TestCmdTapIntegration(t *testing.T) {
testKey := testQueue
testExchange := "amq.topic"

// message must be published, after rabtap tap command is started
go func() {
time.Sleep(time.Second * 1)
time.Sleep(3 * time.Second)
_, ch := testcommon.IntegrationTestConnection(t, "", "", 0, false)
err := ch.Publish(
testExchange,
Expand All @@ -99,15 +96,14 @@ func TestCmdTapIntegration(t *testing.T) {
Headers: amqp.Table{},
})
require.Nil(t, err)
time.Sleep(time.Second * 1)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()

oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{"rabtap", "tap",
"--uri", testcommon.IntegrationURIFromEnv().String(),
"amq.topic:" + testKey,
"--limit=1",
"--format=raw",
"--no-color"}
output := testcommon.CaptureOutput(main)
Expand Down
10 changes: 6 additions & 4 deletions cmd/rabtap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ func startCmdSubscribe(ctx context.Context, args CommandLineArgs) {
messageReceiveFunc, err := createMessageReceiveFunc(opts)
failOnError(err, "options", os.Exit)

termPred := createCountingMessageReceivePred(args.Limit)
termPred, err := createCountingMessageReceivePred(args.Limit)
failOnError(err, "invalid message limit predicate", os.Exit)
filterPred, err := NewExprPredicate(args.Filter)
failOnError(err, fmt.Sprintf("invalid message filter predicate '%s'", args.Filter), os.Exit)

Expand All @@ -170,7 +171,7 @@ func startCmdSubscribe(ctx context.Context, args CommandLineArgs) {
reject: args.Reject,
tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile),
messageReceiveFunc: messageReceiveFunc,
filterPred: createMessagePred(filterPred),
filterPred: filterPred,
messageReceiveLoopPred: termPred,
args: args.Args,
timeout: args.IdleTimeout,
Expand All @@ -188,7 +189,8 @@ func startCmdTap(ctx context.Context, args CommandLineArgs) {
}
messageReceiveFunc, err := createMessageReceiveFunc(opts)
failOnError(err, "options", os.Exit)
termPred := createCountingMessageReceivePred(args.Limit)
termPred, err := createCountingMessageReceivePred(args.Limit)
failOnError(err, "invalid message limit predicate", os.Exit)
filterPred, err := NewExprPredicate(args.Filter)
failOnError(err, fmt.Sprintf("invalid message filter predicate '%s'", args.Filter), os.Exit)

Expand All @@ -197,7 +199,7 @@ func startCmdTap(ctx context.Context, args CommandLineArgs) {
tapConfig: args.TapConfig,
tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile),
messageReceiveFunc: messageReceiveFunc,
filterPred: createMessagePred(filterPred),
filterPred: filterPred,
termPred: termPred,
timeout: args.IdleTimeout,
})
Expand Down
22 changes: 20 additions & 2 deletions cmd/rabtap/predicate_expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (

// ExprPredicate is a Predicate that evaluates using the expr package
type ExprPredicate struct {
prog *vm.Program
initialEnv map[string]interface{}
prog *vm.Program
}

// NewExprPredicate creates a new predicate expression
// NewExprPredicate creates a new predicate expression with an optional initial environment
func NewExprPredicate(exprstr string) (*ExprPredicate, error) {
prog, err := expr.Compile(exprstr)
if err != nil {
Expand All @@ -23,8 +24,25 @@ func NewExprPredicate(exprstr string) (*ExprPredicate, error) {
return &ExprPredicate{prog: prog}, nil
}

// NewExprPredicate creates a new predicate expression with an optional initial environment
func NewExprPredicateWithEnv(exprstr string, env map[string]interface{}) (*ExprPredicate, error) {
options := []expr.Option{
expr.Env(env),
expr.AllowUndefinedVariables(),
expr.AsBool()}

prog, err := expr.Compile(exprstr, options...)
if err != nil {
return nil, err
}
return &ExprPredicate{prog: prog, initialEnv: env}, nil
}

// Eval evaluates the expression with a given set of parameters
func (s ExprPredicate) Eval(env map[string]interface{}) (bool, error) {
for k, v := range s.initialEnv {
env[k] = v
}
result, err := expr.Run(s.prog, env)
if err != nil {
return false, err
Expand Down
29 changes: 23 additions & 6 deletions cmd/rabtap/predicate_expr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,28 @@ func TestExprPredicateFalse(t *testing.T) {
assert.False(t, res)
}

func TestExprPredicateWithEnv(t *testing.T) {
func TestExprPredicateWithInitalEnv(t *testing.T) {
initEnv := map[string]interface{}{"a": 1337}
f, err := NewExprPredicateWithEnv(`b < a`, initEnv)
require.NoError(t, err)

env := map[string]interface{}{"b": 100}
res, err := f.Eval(env)

require.NoError(t, err)
assert.True(t, res)
}
func TestExprPredicateWithEvalEnv(t *testing.T) {
f, err := NewExprPredicate(`a == 1337 && b.X == 42 && c == "JD"`)
require.NoError(t, err)
params := make(map[string]interface{}, 1)
params["a"] = 1337
params["b"] = struct{ X int }{X: 42}
params["c"] = "JD"
res, err := f.Eval(params)
env := map[string]interface{}{
"a": 1337,
"b": struct{ X int }{X: 42},
"c": "JD",
}

res, err := f.Eval(env)

require.NoError(t, err)
assert.True(t, res)
}
Expand All @@ -45,13 +59,16 @@ func TestExprPredicateReturnsErrorOnInvalidSyntax(t *testing.T) {
func TestExprPredicateReturnsErrorOnEvalError(t *testing.T) {
f, err := NewExprPredicate("(1/a) == 1")
require.NoError(t, err)

_, err = f.Eval(nil)
assert.ErrorContains(t, err, "invalid operation")
}
func TestExprPredicateReturnsErrorOnNonBoolReturnValue(t *testing.T) {
f, err := NewExprPredicate("1+1")
require.NoError(t, err)

params := map[string]interface{}{}
_, err = f.Eval(params)

assert.ErrorContains(t, err, "expression does not evaluate to bool")
}
Loading

0 comments on commit de5fca1

Please sign in to comment.