Skip to content

Commit

Permalink
Implement filters on deployment, job & origin
Browse files Browse the repository at this point in the history
This allows filtering of events based on the above mentioned fields.
Multiple filters can be configured and they will run against a message
in the order specified.

A filter has the form

  field₀:comperator₀:value₀;field₁:comperator₁:value₁;…;fieldₙ,comperatorₙ,valueₙ

`field` is the messages field to check against.
`comperator` is how to compate the `field`'s value to the user provided value ('mustContain', 'mustNotContain')
`value` is the value the user want's to check messages for

Benchmarks have been added to get an understanding of the performance
impact of those filters.
  • Loading branch information
hoegaarden committed Mar 10, 2021
1 parent 1ba369d commit fc74374
Show file tree
Hide file tree
Showing 11 changed files with 541 additions and 25 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ This is recommended for dev environments only.
* `BOLTDB_PATH`: Bolt database path.
* `EVENTS`: A comma separated list of events to include. Possible values: ValueMetric,CounterEvent,Error,LogMessage,HttpStartStop,ContainerMetric
* `EXTRA_FIELDS`: Extra fields to annotate your events with (format is key:value,key:value).
* `FILTERS`: Filter events on deployment, job, or origin. The format is `field₀:comperator₀:value₀;field₁:comperator₁:value₁;…;fieldₙ,comperatorₙ,valueₙ` where
* `<field>` is the messages field to check against, e.g. deployment or job
* `<comperator>` how to compare the <field>'s value to the user provided <value>, e.g.: mustContain, mustNotContain
* `<value>` the actual value the message's <field> should be checked for
* `FLUSH_INTERVAL`: Time interval (in s/m/h. For example, 3600s or 60m or 1h) for flushing queue to Splunk regardless of CONSUMER_QUEUE_SIZE. Protects against stale events in low throughput systems.
* `CONSUMER_QUEUE_SIZE`: Sets the internal consumer queue buffer size. Events will be pushed to Splunk after queue is full.
* `HEC_BATCH_SIZE`: Set the batch size for the events to push to HEC (Splunk HTTP Event Collector).
Expand Down
13 changes: 13 additions & 0 deletions eventfilter/eventfilter_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package eventfilter_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestEventfilter(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Eventfilter Suite")
}
174 changes: 174 additions & 0 deletions eventfilter/filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package eventfilter

import (
"fmt"
"strings"

"github.com/cloudfoundry/sonde-go/events"
)

const (
filterSep = ";"
filterValuesSep = ":"
)

// getterFunc is a function that, given an Envelope, returns the things we care
// about, e.g. the Deployment, Job, ...
type getterFunc = func(msg *events.Envelope) string

// supportedGetters are all supported keys we can use for filters and the
// getters / functions that pull the respective data out of an envelope.
var supportedGetters = map[string]getterFunc{
"deployment": func(msg *events.Envelope) string {
return msg.GetDeployment()
},
"origin": func(msg *events.Envelope) string {
return msg.GetOrigin()
},
"job": func(msg *events.Envelope) string {
return msg.GetJob()
},
}

// filterFunc gets called with data from the message envelope, pulled out by a
// getter, and compares it to the user provided value. If it returns true, this
// message should be accepted, else the message should be dropped.
type filterFunc = func(msgData, userInput string) bool

// supportedFilters are all supported filter names and
// the filters / functions that match run against the data from the message and
// compares it to the data provided by the user.
// E.g. when we have
// - the filter func strings.Contains
// - the getter, that gets the message's origin
// - and the value 'foo'
// only messages with the origin 'foo' will be accepted by the filter
var supportedFilters = map[string]filterFunc{
"mustContain": strings.Contains,
"mustNotContain": func(msgData, userInput string) bool { return !strings.Contains(msgData, userInput) },
}

// SupportedFilterKeys lists all supported filter keys. This is only used to
// signal the list of supported keys to users, e.g. for the usage text.
var SupportedFilterKeys = func() []string {
keys := make([]string, 0, len(supportedGetters))
for k := range supportedGetters {
keys = append(keys, k)
}

return keys
}()

// SupportedFilters lists all supported filter names. This is only used to
// signal the list of supported filters to users, e.g. for the usage text.
var SupportedFilters = func() []string {
keys := make([]string, 0, len(supportedFilters))
for k := range supportedFilters {
keys = append(keys, k)
}

return keys
}()

// Filters is something that can tell it's Length (the number of its configured
// filters) and can be used to check if an envelope is accepted or should be
// dropped/discarded.
type Filters interface {
Accepts(*events.Envelope) bool
Length() int
}

type filterRule struct {
getter getterFunc
filter filterFunc
value string
}

var (
errInvalidFormat = fmt.Errorf("format must be '%q:%q:<value>'", SupportedFilterKeys, SupportedFilters)
errEmptyValue = fmt.Errorf("filter value must not be empty")
errInvaldFilter = fmt.Errorf("filter key must be one of %q", SupportedFilterKeys)
errInvalidFilterKey = fmt.Errorf("filter must be one of %q", SupportedFilters)
)

func parseFilterConfig(filters string) ([]filterRule, error) {
rules := []filterRule{}

for _, filterRaw := range strings.Split(filters, filterSep) {
filter := strings.TrimSpace(filterRaw)

if filter == "" {
continue
}

tokens := strings.Split(filter, filterValuesSep)
if len(tokens) != 3 {
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvalidFormat)
}

getterKey := strings.TrimSpace(strings.ToLower(tokens[0]))
filterKey := strings.TrimSpace(tokens[1])

var ok bool
rule := filterRule{
value: tokens[2],
}

if rule.value == "" {
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errEmptyValue)
}

rule.filter, ok = supportedFilters[filterKey]
if !ok {
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvaldFilter)
}

rule.getter, ok = supportedGetters[getterKey]
if !ok {
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvalidFilterKey)
}

rules = append(rules, rule)
}

return rules, nil
}

type filter func(*events.Envelope) bool

type filters []filter

func (ef *filters) Accepts(msg *events.Envelope) bool {
for _, f := range *ef {
if allow := f(msg); !allow {
return false
}
}

return true
}

func (ef *filters) Length() int {
return len(*ef)
}

func (ef *filters) addFilter(valueGetter getterFunc, valueFilter filterFunc, value string) {
*ef = append(*ef, func(msg *events.Envelope) bool {
return valueFilter(valueGetter(msg), value)
})
}

func New(filterList string) (Filters, error) {
f := &filters{}

rules, err := parseFilterConfig(filterList)
if err != nil {
return nil, err
}

for _, rule := range rules {
f.addFilter(rule.getter, rule.filter, rule.value)
}

return f, nil
}
93 changes: 93 additions & 0 deletions eventfilter/filters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package eventfilter_test

import (
"github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter"
"github.com/cloudfoundry/sonde-go/events"

. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
)

var _ = Describe("Rule parsing", func() {
testError := func(filterConf string, errorMsg string) {
filters, err := eventfilter.New(filterConf)
Expect(filters).To(BeNil())
Expect(err).To(MatchError(ContainSubstring(errorMsg)))
}
DescribeTable("throws error", testError,
Entry("not enough fields", ":", "format must be"),
Entry("too many fields", "xxx:yyy:zzz:rrrr", "format must be"),
Entry("invalid value", "xxx::", "filter value must not be empty"),
Entry("invalid filter", "xxx:yyy:zzz", "filter key must be one of"),
Entry("invalid field", "notValid:mustContain:zzz", "filter must be one of"),
)

testOk := func(filterConf string, length int) {
filters, err := eventfilter.New(filterConf)
Expect(err).NotTo(HaveOccurred())
Expect(filters).NotTo(BeNil(), "filters have not been initialized")
Expect(filters.Length()).To(Equal(length), "Expected %d filter rules", length)
}
DescribeTable("parses ok", testOk,
Entry("no filters at all", "", 0),
Entry("multiple empty rules", ";;;;", 0),
Entry("filtering on deployment", "deployment:mustContain:some deployment", 1),
Entry("accepts whitespace between rules", " deployment:mustContain:something ; origin:mustContain:someOrigin ", 2),
Entry("accepts whitespace in filter", "deployment: mustContain :something", 1),

Entry("inclusion filter on deployment", "Deployment:mustContain:something", 1),
Entry("inclusion filter on origin", "origin:mustContain:something", 1),
Entry("inclusion filter on job", "job:mustContain:something", 1),

Entry("exclusion filter on deployment", "Deployment:mustNotContain:something", 1),
Entry("exclusion filter on origin", "origin:mustNotContain:something", 1),
Entry("exclusion filter on job", "job:mustNotContain:something", 1),
)
})

var _ = Describe("Filtering", func() {
msg := &events.Envelope{
Deployment: p("p-healthwatch2-123123123"),
Origin: p("some origin"),
Job: p("some job"),
}

test := func(filterConf string, expected bool) {
filters, err := eventfilter.New(filterConf)
Expect(err).NotTo(HaveOccurred())
Expect(filters.Accepts(msg)).
To(Equal(expected), "Expected event {%v} to be %s", msg, tern(expected, "accepted", "discarded"))
Expect(filters).NotTo(BeNil(), "filters have not been initialized")
}

DescribeTable("on", test,
Entry("empty filter conf should accept", "", true),
Entry("matching inclusion filter should accept", "deployment:mustContain:healthwatch2", true),
Entry("non-matching inclusion filter should discard", "deployment:mustContain:something", false),
Entry("matching exclusion filter should discard", "deployment:mustNotContain:healthwatch2", false),
Entry("2nd exclusion filter should discard", "deployment:mustNotContain:health ; deployment:mustNotContain:watch", false),
Entry("3rd exclusion filter should discard",
"deployment:mustContain:health ; job:mustNotContain:other job ; deployment:mustNotContain:watch",
false,
),
Entry("many matching inclusion filters should accept",
"deployment:mustContain:h ; deployment:mustContain:e ; deployment:mustContain:a ; deployment:mustContain:l ; deployment:mustContain:t ; deployment:mustContain:h",
true,
),
Entry("many non-matching exclusion filters should accept",
"deployment:mustNotContain:x ; deployment:mustNotContain:y ; deployment:mustNotContain:z ; deployment:mustNotContain:u ; deployment:mustNotContain:b ; deployment:mustNotContain:r",
true,
),
)
})

func p(s string) *string { return &s }

func tern(b bool, t string, f string) string {
if b {
return t
}

return f
}
53 changes: 42 additions & 11 deletions eventrouter/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/cloudfoundry-community/splunk-firehose-nozzle/cache"
"github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter"
fevents "github.com/cloudfoundry-community/splunk-firehose-nozzle/events"
"github.com/cloudfoundry-community/splunk-firehose-nozzle/eventsink"
"github.com/cloudfoundry/sonde-go/events"
Expand All @@ -18,29 +19,34 @@ type router struct {
config *Config
}

func New(appCache cache.Cache, sink eventsink.Sink, config *Config) (Router, error) {
selectedEvents, err := fevents.ParseSelectedEvents(config.SelectedEvents)
type filteringRouter struct {
*router
filters eventfilter.Filters
}

func New(appCache cache.Cache, sink eventsink.Sink, config *Config, filters eventfilter.Filters) (Router, error) {
selectedEvents, err := fevents.ParseSelectedEvents(config.SelectedEvents)
if err != nil {
return nil, err
}

return &router{
r := &router{
appCache: appCache,
sink: sink,
selectedEvents: selectedEvents,
config: config,
}, nil
}

func (r *router) Route(msg *events.Envelope) error {
eventType := msg.GetEventType()
}

if _, ok := r.selectedEvents[eventType.String()]; !ok {
// Ignore this event since we are not interested
return nil
// if no filters were defined, we return the original router,
// otherwise we return the filtering router
if filters == nil || filters.Length() < 1 {
return r, nil
}

return &filteringRouter{router: r, filters: filters}, nil
}

func (r *router) processMessage(msg *events.Envelope, eventType events.Envelope_EventType) error {
var event *fevents.Event
switch eventType {
case events.Envelope_HttpStartStop:
Expand Down Expand Up @@ -83,5 +89,30 @@ func (r *router) Route(msg *events.Envelope) error {
fields := map[string]interface{}{"err": fmt.Sprintf("%s", err)}
r.sink.Write(fields, "Failed to write events")
}

return err
}

func (r *router) Route(msg *events.Envelope) error {
eventType := msg.GetEventType()
if _, ok := r.selectedEvents[eventType.String()]; !ok {
// Ignore this event since we are not interested
return nil
}

return r.processMessage(msg, eventType)
}

func (r *filteringRouter) Route(msg *events.Envelope) error {
eventType := msg.GetEventType()
if _, ok := r.selectedEvents[eventType.String()]; !ok {
// Ignore this event since we are not interested
return nil
}

if !r.filters.Accepts(msg) {
return nil
}

return r.processMessage(msg, eventType)
}
Loading

0 comments on commit fc74374

Please sign in to comment.