diff --git a/README.md b/README.md index a50f1cb2..b15bc0c7 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,10 @@ This is recommended for dev environments only. * `BOLTDB_PATH`: Bolt database path. * `EVENTS`: A comma separated list of events to include. Possible values: ValueMetric,CounterEvent,Error,LogMessage,HttpStartStop,ContainerMetric * `EXTRA_FIELDS`: Extra fields to annotate your events with (format is key:value,key:value). +* `FILTERS`: Filter events on deployment, job, or origin. The format is `field₀:comperator₀:value₀;field₁:comperator₁:value₁;…;fieldₙ,comperatorₙ,valueₙ` where + * `` is the messages field to check against, e.g. deployment or job + * `` how to compare the 's value to the user provided , e.g.: mustContain, mustNotContain + * `` the actual value the message's should be checked for * `FLUSH_INTERVAL`: Time interval (in s/m/h. For example, 3600s or 60m or 1h) for flushing queue to Splunk regardless of CONSUMER_QUEUE_SIZE. Protects against stale events in low throughput systems. * `CONSUMER_QUEUE_SIZE`: Sets the internal consumer queue buffer size. Events will be pushed to Splunk after queue is full. * `HEC_BATCH_SIZE`: Set the batch size for the events to push to HEC (Splunk HTTP Event Collector). diff --git a/eventfilter/eventfilter_suite_test.go b/eventfilter/eventfilter_suite_test.go new file mode 100644 index 00000000..1c594e45 --- /dev/null +++ b/eventfilter/eventfilter_suite_test.go @@ -0,0 +1,13 @@ +package eventfilter_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestEventfilter(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Eventfilter Suite") +} diff --git a/eventfilter/filters.go b/eventfilter/filters.go new file mode 100644 index 00000000..64053314 --- /dev/null +++ b/eventfilter/filters.go @@ -0,0 +1,174 @@ +package eventfilter + +import ( + "fmt" + "strings" + + "github.com/cloudfoundry/sonde-go/events" +) + +const ( + filterSep = ";" + filterValuesSep = ":" +) + +// getterFunc is a function that, given an Envelope, returns the things we care +// about, e.g. the Deployment, Job, ... +type getterFunc = func(msg *events.Envelope) string + +// supportedGetters are all supported keys we can use for filters and the +// getters / functions that pull the respective data out of an envelope. +var supportedGetters = map[string]getterFunc{ + "deployment": func(msg *events.Envelope) string { + return msg.GetDeployment() + }, + "origin": func(msg *events.Envelope) string { + return msg.GetOrigin() + }, + "job": func(msg *events.Envelope) string { + return msg.GetJob() + }, +} + +// filterFunc gets called with data from the message envelope, pulled out by a +// getter, and compares it to the user provided value. If it returns true, this +// message should be accepted, else the message should be dropped. +type filterFunc = func(msgData, userInput string) bool + +// supportedFilters are all supported filter names and +// the filters / functions that match run against the data from the message and +// compares it to the data provided by the user. +// E.g. when we have +// - the filter func strings.Contains +// - the getter, that gets the message's origin +// - and the value 'foo' +// only messages with the origin 'foo' will be accepted by the filter +var supportedFilters = map[string]filterFunc{ + "mustContain": strings.Contains, + "mustNotContain": func(msgData, userInput string) bool { return !strings.Contains(msgData, userInput) }, +} + +// SupportedFilterKeys lists all supported filter keys. This is only used to +// signal the list of supported keys to users, e.g. for the usage text. +var SupportedFilterKeys = func() []string { + keys := make([]string, 0, len(supportedGetters)) + for k := range supportedGetters { + keys = append(keys, k) + } + + return keys +}() + +// SupportedFilters lists all supported filter names. This is only used to +// signal the list of supported filters to users, e.g. for the usage text. +var SupportedFilters = func() []string { + keys := make([]string, 0, len(supportedFilters)) + for k := range supportedFilters { + keys = append(keys, k) + } + + return keys +}() + +// Filters is something that can tell it's Length (the number of its configured +// filters) and can be used to check if an envelope is accepted or should be +// dropped/discarded. +type Filters interface { + Accepts(*events.Envelope) bool + Length() int +} + +type filterRule struct { + getter getterFunc + filter filterFunc + value string +} + +var ( + errInvalidFormat = fmt.Errorf("format must be '%q:%q:'", SupportedFilterKeys, SupportedFilters) + errEmptyValue = fmt.Errorf("filter value must not be empty") + errInvaldFilter = fmt.Errorf("filter key must be one of %q", SupportedFilterKeys) + errInvalidFilterKey = fmt.Errorf("filter must be one of %q", SupportedFilters) +) + +func parseFilterConfig(filters string) ([]filterRule, error) { + rules := []filterRule{} + + for _, filterRaw := range strings.Split(filters, filterSep) { + filter := strings.TrimSpace(filterRaw) + + if filter == "" { + continue + } + + tokens := strings.Split(filter, filterValuesSep) + if len(tokens) != 3 { + return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvalidFormat) + } + + getterKey := strings.TrimSpace(strings.ToLower(tokens[0])) + filterKey := strings.TrimSpace(tokens[1]) + + var ok bool + rule := filterRule{ + value: tokens[2], + } + + if rule.value == "" { + return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errEmptyValue) + } + + rule.filter, ok = supportedFilters[filterKey] + if !ok { + return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvaldFilter) + } + + rule.getter, ok = supportedGetters[getterKey] + if !ok { + return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvalidFilterKey) + } + + rules = append(rules, rule) + } + + return rules, nil +} + +type filter func(*events.Envelope) bool + +type filters []filter + +func (ef *filters) Accepts(msg *events.Envelope) bool { + for _, f := range *ef { + if allow := f(msg); !allow { + return false + } + } + + return true +} + +func (ef *filters) Length() int { + return len(*ef) +} + +func (ef *filters) addFilter(valueGetter getterFunc, valueFilter filterFunc, value string) { + *ef = append(*ef, func(msg *events.Envelope) bool { + return valueFilter(valueGetter(msg), value) + }) +} + +func New(filterList string) (Filters, error) { + f := &filters{} + + rules, err := parseFilterConfig(filterList) + if err != nil { + return nil, err + } + + for _, rule := range rules { + f.addFilter(rule.getter, rule.filter, rule.value) + } + + return f, nil +} diff --git a/eventfilter/filters_test.go b/eventfilter/filters_test.go new file mode 100644 index 00000000..aa205712 --- /dev/null +++ b/eventfilter/filters_test.go @@ -0,0 +1,93 @@ +package eventfilter_test + +import ( + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" + "github.com/cloudfoundry/sonde-go/events" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/ginkgo/extensions/table" + . "github.com/onsi/gomega" +) + +var _ = Describe("Rule parsing", func() { + testError := func(filterConf string, errorMsg string) { + filters, err := eventfilter.New(filterConf) + Expect(filters).To(BeNil()) + Expect(err).To(MatchError(ContainSubstring(errorMsg))) + } + DescribeTable("throws error", testError, + Entry("not enough fields", ":", "format must be"), + Entry("too many fields", "xxx:yyy:zzz:rrrr", "format must be"), + Entry("invalid value", "xxx::", "filter value must not be empty"), + Entry("invalid filter", "xxx:yyy:zzz", "filter key must be one of"), + Entry("invalid field", "notValid:mustContain:zzz", "filter must be one of"), + ) + + testOk := func(filterConf string, length int) { + filters, err := eventfilter.New(filterConf) + Expect(err).NotTo(HaveOccurred()) + Expect(filters).NotTo(BeNil(), "filters have not been initialized") + Expect(filters.Length()).To(Equal(length), "Expected %d filter rules", length) + } + DescribeTable("parses ok", testOk, + Entry("no filters at all", "", 0), + Entry("multiple empty rules", ";;;;", 0), + Entry("filtering on deployment", "deployment:mustContain:some deployment", 1), + Entry("accepts whitespace between rules", " deployment:mustContain:something ; origin:mustContain:someOrigin ", 2), + Entry("accepts whitespace in filter", "deployment: mustContain :something", 1), + + Entry("inclusion filter on deployment", "Deployment:mustContain:something", 1), + Entry("inclusion filter on origin", "origin:mustContain:something", 1), + Entry("inclusion filter on job", "job:mustContain:something", 1), + + Entry("exclusion filter on deployment", "Deployment:mustNotContain:something", 1), + Entry("exclusion filter on origin", "origin:mustNotContain:something", 1), + Entry("exclusion filter on job", "job:mustNotContain:something", 1), + ) +}) + +var _ = Describe("Filtering", func() { + msg := &events.Envelope{ + Deployment: p("p-healthwatch2-123123123"), + Origin: p("some origin"), + Job: p("some job"), + } + + test := func(filterConf string, expected bool) { + filters, err := eventfilter.New(filterConf) + Expect(err).NotTo(HaveOccurred()) + Expect(filters.Accepts(msg)). + To(Equal(expected), "Expected event {%v} to be %s", msg, tern(expected, "accepted", "discarded")) + Expect(filters).NotTo(BeNil(), "filters have not been initialized") + } + + DescribeTable("on", test, + Entry("empty filter conf should accept", "", true), + Entry("matching inclusion filter should accept", "deployment:mustContain:healthwatch2", true), + Entry("non-matching inclusion filter should discard", "deployment:mustContain:something", false), + Entry("matching exclusion filter should discard", "deployment:mustNotContain:healthwatch2", false), + Entry("2nd exclusion filter should discard", "deployment:mustNotContain:health ; deployment:mustNotContain:watch", false), + Entry("3rd exclusion filter should discard", + "deployment:mustContain:health ; job:mustNotContain:other job ; deployment:mustNotContain:watch", + false, + ), + Entry("many matching inclusion filters should accept", + "deployment:mustContain:h ; deployment:mustContain:e ; deployment:mustContain:a ; deployment:mustContain:l ; deployment:mustContain:t ; deployment:mustContain:h", + true, + ), + Entry("many non-matching exclusion filters should accept", + "deployment:mustNotContain:x ; deployment:mustNotContain:y ; deployment:mustNotContain:z ; deployment:mustNotContain:u ; deployment:mustNotContain:b ; deployment:mustNotContain:r", + true, + ), + ) +}) + +func p(s string) *string { return &s } + +func tern(b bool, t string, f string) string { + if b { + return t + } + + return f +} diff --git a/eventrouter/default.go b/eventrouter/default.go index 4f75de9c..ad1c3137 100644 --- a/eventrouter/default.go +++ b/eventrouter/default.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/cloudfoundry-community/splunk-firehose-nozzle/cache" + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" fevents "github.com/cloudfoundry-community/splunk-firehose-nozzle/events" "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventsink" "github.com/cloudfoundry/sonde-go/events" @@ -18,29 +19,34 @@ type router struct { config *Config } -func New(appCache cache.Cache, sink eventsink.Sink, config *Config) (Router, error) { - selectedEvents, err := fevents.ParseSelectedEvents(config.SelectedEvents) +type filteringRouter struct { + *router + filters eventfilter.Filters +} +func New(appCache cache.Cache, sink eventsink.Sink, config *Config, filters eventfilter.Filters) (Router, error) { + selectedEvents, err := fevents.ParseSelectedEvents(config.SelectedEvents) if err != nil { return nil, err } - return &router{ + r := &router{ appCache: appCache, sink: sink, selectedEvents: selectedEvents, config: config, - }, nil -} - -func (r *router) Route(msg *events.Envelope) error { - eventType := msg.GetEventType() + } - if _, ok := r.selectedEvents[eventType.String()]; !ok { - // Ignore this event since we are not interested - return nil + // if no filters were defined, we return the original router, + // otherwise we return the filtering router + if filters == nil || filters.Length() < 1 { + return r, nil } + return &filteringRouter{router: r, filters: filters}, nil +} + +func (r *router) processMessage(msg *events.Envelope, eventType events.Envelope_EventType) error { var event *fevents.Event switch eventType { case events.Envelope_HttpStartStop: @@ -83,5 +89,30 @@ func (r *router) Route(msg *events.Envelope) error { fields := map[string]interface{}{"err": fmt.Sprintf("%s", err)} r.sink.Write(fields, "Failed to write events") } + return err } + +func (r *router) Route(msg *events.Envelope) error { + eventType := msg.GetEventType() + if _, ok := r.selectedEvents[eventType.String()]; !ok { + // Ignore this event since we are not interested + return nil + } + + return r.processMessage(msg, eventType) +} + +func (r *filteringRouter) Route(msg *events.Envelope) error { + eventType := msg.GetEventType() + if _, ok := r.selectedEvents[eventType.String()]; !ok { + // Ignore this event since we are not interested + return nil + } + + if !r.filters.Accepts(msg) { + return nil + } + + return r.processMessage(msg, eventType) +} diff --git a/eventrouter/eventrouter_benchmark_test.go b/eventrouter/eventrouter_benchmark_test.go index f09d390b..21d14371 100644 --- a/eventrouter/eventrouter_benchmark_test.go +++ b/eventrouter/eventrouter_benchmark_test.go @@ -1,7 +1,10 @@ package eventrouter_test import ( + "fmt" + "github.com/cloudfoundry-community/splunk-firehose-nozzle/cache" + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventrouter" "github.com/cloudfoundry/sonde-go/events" . "github.com/onsi/ginkgo" @@ -15,8 +18,22 @@ const ( var _ = Describe("eventrouter", func() { var router eventrouter.Router + var filters eventfilter.Filters var sink *devNullSink + measureIt := func(f string, expectedMsgs int) { + title := fmt.Sprintf("message through-put for filter %q", f) + Measure(title, func(b Benchmarker) { + runtime := b.Time("route", func() { + err := pushMessages(router, messagesPerRun) + Expect(err).NotTo(HaveOccurred()) + Expect(sink.msgs).To(Equal(expectedMsgs)) + }) + + b.RecordValue("routed messages per micro second", float64(messagesPerRun)/runtime.Seconds()/float64(1000)) + }, runs) + } + JustBeforeEach(func() { cache := nullCache{} sink = &devNullSink{} @@ -24,20 +41,72 @@ var _ = Describe("eventrouter", func() { SelectedEvents: "LogMessage,HttpStart,HttpStop,HttpStartStop,ValueMetric,CounterEvent,Error,ContainerMetric", } var err error - router, err = eventrouter.New(cache, sink, config) + router, err = eventrouter.New(cache, sink, config, filters) Expect(err).NotTo(HaveOccurred()) }) - Measure("through-put", func(b Benchmarker) { - runtime := b.Time("route", func() { - err := pushMessages(router, messagesPerRun) + measureIt("", messagesPerRun) + + Describe("single accepting filter", func() { + var filterRules = "deployment:mustContain:some deployment" + BeforeEach(func() { + var err error + filters, err = eventfilter.New(filterRules) Expect(err).NotTo(HaveOccurred()) - Expect(sink.msgs).To(Equal(messagesPerRun)) }) + measureIt(filterRules, messagesPerRun) + }) + + Describe("multiple filters, first one accepts, others don't match", func() { + filterRules := "deployment:mustContain:ome depl " + manyNotMatchingThings + BeforeEach(func() { + var err error + filters, err = eventfilter.New(filterRules) + Expect(err).NotTo(HaveOccurred()) + }) + measureIt(first(filterRules, 40), messagesPerRun) + }) + + Describe("multiple filters, last one accepts", func() { + filterRules := manyNotMatchingThings + " deployment:mustContain:ome depl" + BeforeEach(func() { + var err error + filters, err = eventfilter.New(filterRules) + Expect(err).NotTo(HaveOccurred()) + }) + measureIt(last(filterRules, 40), messagesPerRun) + }) - b.RecordValue("routed messages per micro second", float64(messagesPerRun)/runtime.Seconds()/float64(1000)) - }, runs) + Describe("multiple filters, first one discards", func() { + filterRules := "deployment:mustNotContain:ome depl " + manyNotMatchingThings + BeforeEach(func() { + var err error + filters, err = eventfilter.New(filterRules) + Expect(err).NotTo(HaveOccurred()) + }) + measureIt(first(filterRules, 40), 0) + }) + + Describe("multiple filters, first one accepts, second one discards, others don't match", func() { + filterRules := "deployment:mustContain:ome depl; deployment:mustNotContain:some " + manyNotMatchingThings + BeforeEach(func() { + var err error + filters, err = eventfilter.New(filterRules) + Expect(err).NotTo(HaveOccurred()) + }) + measureIt(first(filterRules, 60), 0) + }) }) +// manyNotMatchingThings is used to create some closures that get passed into +// the router as filters, to be able to measure if and how the number of those +// closures influences the router's performance. +const manyNotMatchingThings = ";" + + "origin:mustNotContain:ignore0; origin:mustNotContain:ignore1; origin:mustNotContain:ignore2; origin:mustNotContain:ignore3;" + + "origin:mustNotContain:ignore4; origin:mustNotContain:ignore5; origin:mustNotContain:ignore6; origin:mustNotContain:ignore7;" + + "origin:mustNotContain:ignore8; origin:mustNotContain:ignore9; origin:mustNotContain:ignore10; origin:mustNotContain:ignore11;" + + "origin:mustNotContain:ignore12; origin:mustNotContain:ignore13; origin:mustNotContain:ignore14; origin:mustNotContain:ignore15;" + + "origin:mustNotContain:ignore16; origin:mustNotContain:ignore17; origin:mustNotContain:ignore18; origin:mustNotContain:ignore19;" + func pushMessages(r eventrouter.Router, nrOfMsgs int) error { events := make(chan *events.Envelope, 10) stop := make(chan struct{}) @@ -93,3 +162,11 @@ func (nullCache) Open() error { return nil } func (nullCache) Close() error { return nil } func (nullCache) GetAllApps() (map[string]*cache.App, error) { return map[string]*cache.App{}, nil } func (nullCache) GetApp(string) (*cache.App, error) { return &cache.App{}, nil } + +func first(s string, l int) string { + return s[0:l] + "[...]" +} + +func last(s string, l int) string { + return "[...]" + s[len(s)-l:] +} diff --git a/eventrouter/eventrouter_test.go b/eventrouter/eventrouter_test.go index 1c7626b2..5e9e4192 100644 --- a/eventrouter/eventrouter_test.go +++ b/eventrouter/eventrouter_test.go @@ -1,6 +1,7 @@ package eventrouter_test import ( + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" . "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventrouter" "github.com/cloudfoundry-community/splunk-firehose-nozzle/testing" "github.com/cloudfoundry/sonde-go/events" @@ -33,7 +34,7 @@ var _ = Describe("eventrouter", func() { config := &Config{ SelectedEvents: "LogMessage,HttpStart,HttpStop,HttpStartStop,ValueMetric,CounterEvent,Error,ContainerMetric", } - r, err = New(noCache, memSink, config) + r, err = New(noCache, memSink, config, nil) Ω(err).ShouldNot(HaveOccurred()) timestampNano = 1467040874046121775 @@ -84,7 +85,7 @@ var _ = Describe("eventrouter", func() { config := &Config{ SelectedEvents: "HttpStart", } - r, err = New(noCache, memSink, config) + r, err = New(noCache, memSink, config, nil) Ω(err).ShouldNot(HaveOccurred()) eventType = events.Envelope_HttpStop @@ -98,7 +99,7 @@ var _ = Describe("eventrouter", func() { config := &Config{ SelectedEvents: "", } - r, err = New(noCache, memSink, config) + r, err = New(noCache, memSink, config, nil) Ω(err).ShouldNot(HaveOccurred()) eventType = events.Envelope_LogMessage @@ -127,7 +128,7 @@ var _ = Describe("eventrouter", func() { config := &Config{ SelectedEvents: "invalid", } - r, err = New(noCache, memSink, config) + r, err = New(noCache, memSink, config, nil) Ω(err).ShouldNot(HaveOccurred()) eventType = invalid @@ -151,7 +152,95 @@ var _ = Describe("eventrouter", func() { config := &Config{ SelectedEvents: "invalid-event", } - _, err = New(noCache, memSink, config) + _, err = New(noCache, memSink, config, nil) Ω(err).Should(HaveOccurred()) }) }) + +var _ = Describe("eventrouter filtering", func() { + var msg *events.Envelope + var router Router + var filters eventfilter.Filters + var sink *devNullSink + + BeforeEach(func() { + msg = &events.Envelope{ + Origin: p("some origin"), + } + sink = &devNullSink{} + }) + + JustBeforeEach(func() { + config := &Config{ + SelectedEvents: "LogMessage,HttpStart,HttpStop,HttpStartStop,ValueMetric,CounterEvent,Error,ContainerMetric", + } + var err error + + router, err = New(nullCache{}, sink, config, filters) + Expect(err).NotTo(HaveOccurred()) + + err = router.Route(msg) + Expect(err).NotTo(HaveOccurred()) + }) + + Context("with a nil filter", func() { + BeforeEach(func() { + filters = nil + }) + It("routes the message", func() { + Expect(sink.msgs).To(Equal(1)) + }) + }) + + Context("with a filter that blocks everything", func() { + BeforeEach(func() { + filters = blockingFilter{} + }) + It("discards the message", func() { + Expect(sink.msgs).To(Equal(0)) + }) + }) + + Context("with an accepting origin filter", func() { + BeforeEach(func() { + filters = originFilter{allowedOrigin: msg.GetOrigin()} + }) + It("routes the message", func() { + Expect(sink.msgs).To(Equal(1)) + }) + }) + + Context("with an blocking origin filter", func() { + BeforeEach(func() { + filters = originFilter{allowedOrigin: "Invalid origin"} + }) + It("discards the message", func() { + Expect(sink.msgs).To(Equal(0)) + }) + }) +}) + +type fakeFilter struct{} + +func (fakeFilter) Length() int { return 1 } + +type allowingFilter struct{ fakeFilter } + +func (allowingFilter) Accepts(m *events.Envelope) bool { + return true +} + +type blockingFilter struct{ fakeFilter } + +func (blockingFilter) Accepts(m *events.Envelope) bool { + return false +} + +type originFilter struct { + fakeFilter + allowedOrigin string +} + +func (of originFilter) Accepts(m *events.Envelope) bool { + return of.allowedOrigin == m.GetOrigin() +} diff --git a/eventsink/splunk_test.go b/eventsink/splunk_test.go index 60c1e9d1..4bcf7e16 100644 --- a/eventsink/splunk_test.go +++ b/eventsink/splunk_test.go @@ -61,7 +61,7 @@ var _ = Describe("Splunk", func() { rconfig := &eventrouter.Config{ SelectedEvents: "ContainerMetric, CounterEvent, Error, HttpStart, HttpStartStop, HttpStop, LogMessage, ValueMetric", } - eventRouter, err = eventrouter.New(cache.NewNoCache(), memSink, rconfig) + eventRouter, err = eventrouter.New(cache.NewNoCache(), memSink, rconfig, nil) Ω(err).ShouldNot(HaveOccurred()) mockClient = &testing.EventWriterMock{} diff --git a/splunknozzle/config.go b/splunknozzle/config.go index be8f095b..ad3bd8bc 100644 --- a/splunknozzle/config.go +++ b/splunknozzle/config.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" "github.com/cloudfoundry-community/splunk-firehose-nozzle/events" kingpin "gopkg.in/alecthomas/kingpin.v2" @@ -41,6 +42,8 @@ type Config struct { WantedEvents string `json:"wanted-events"` ExtraFields string `json:"extra-fields"` + Filters string `json:"filters"` + FlushInterval time.Duration `json:"flush-interval"` QueueSize int `json:"queue-size"` BatchSize int `json:"batch-size"` @@ -121,6 +124,23 @@ func NewConfigFromCmdFlags(version, branch, commit, buildos string) *Config { kingpin.Flag("extra-fields", "Extra fields you want to annotate your events with, example: '--extra-fields=env:dev,something:other "). OverrideDefaultFromEnvar("EXTRA_FIELDS").Default("").StringVar(&c.ExtraFields) + kingpin.Flag("filters", fmt.Sprintf( + "Filter events. A valid filter looks like '::'. Multiple of those "+ + "filters can be given by separating them with ';', in which case they run in the order given. "+ + " is what part of a message to check against, must be one of %q. "+ + " how to compare the 's value against , must be one of %q. "+ + " what to check a message for. "+ + "The more filters are specified, the more work needs to be done per incoming event. "+ + "Example: given the filter "+ + "'deployment:mustContain:healthwatch2; deployment:mustNotContain:exporters; job:mustContain:someJob' "+ + "and events coming from the deployments %v, only events from the deployment %q and "+ + "the job %q would be forwarded, all other events would be dropped.", + eventfilter.SupportedFilterKeys, eventfilter.SupportedFilters, + []string{"p-healthwatch2-UUID", "p-healthwatch2-exporters-UUID", "cf-UUID"}, + "p-healthwatch2-UUID", "someJob", + )). + OverrideDefaultFromEnvar("FILTERS").Default("").StringVar(&c.Filters) + kingpin.Flag("flush-interval", "Every interval flushes to Splunk Http Event Collector server"). OverrideDefaultFromEnvar("FLUSH_INTERVAL").Default("5s").DurationVar(&c.FlushInterval) kingpin.Flag("consumer-queue-size", "Consumer queue buffer size"). diff --git a/splunknozzle/config_test.go b/splunknozzle/config_test.go index 28485ec5..ca453947 100644 --- a/splunknozzle/config_test.go +++ b/splunknozzle/config_test.go @@ -66,6 +66,8 @@ var _ = Describe("Config", func() { os.Setenv("ENABLE_EVENT_TRACING", "true") os.Setenv("DEBUG", "true") + os.Setenv("FILTERS", "some filters") + c := NewConfigFromCmdFlags(version, branch, commit, buildos) Expect(c.ApiEndpoint).To(Equal("api.bosh-lite.com")) @@ -112,6 +114,8 @@ var _ = Describe("Config", func() { Expect(c.TraceLogging).To(BeTrue()) Expect(c.Debug).To(BeTrue()) + + Expect(c.Filters).To(Equal("some filters")) }) It("check defaults", func() { @@ -145,6 +149,8 @@ var _ = Describe("Config", func() { Expect(c.TraceLogging).To(BeFalse()) Expect(c.Debug).To(BeFalse()) + + Expect(c.Filters).To(Equal("")) }) }) @@ -192,6 +198,7 @@ var _ = Describe("Config", func() { "--splunk-version=5.2", "--enable-event-tracing", "--debug", + "--filters=some filters", } os.Args = args }) @@ -243,6 +250,7 @@ var _ = Describe("Config", func() { Expect(c.Commit).To(Equal(commit)) Expect(c.BuildOS).To(Equal(buildos)) + Expect(c.Filters).To(Equal("some filters")) }) }) }) diff --git a/splunknozzle/nozzle.go b/splunknozzle/nozzle.go index e68b03c5..92bc213e 100644 --- a/splunknozzle/nozzle.go +++ b/splunknozzle/nozzle.go @@ -8,6 +8,7 @@ import ( "code.cloudfoundry.org/lager" cfclient "github.com/cloudfoundry-community/go-cfclient" "github.com/cloudfoundry-community/splunk-firehose-nozzle/cache" + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventrouter" "github.com/cloudfoundry-community/splunk-firehose-nozzle/events" "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventsink" @@ -42,7 +43,13 @@ func (s *SplunkFirehoseNozzle) EventRouter(cache cache.Cache, eventSink eventsin AddSpaceName: strings.Contains(LowerAddAppInfo, "spacename"), AddSpaceGuid: strings.Contains(LowerAddAppInfo, "spaceguid"), } - return eventrouter.New(cache, eventSink, config) + + filter, err := eventfilter.New(s.config.Filters) + if err != nil { + return nil, err + } + + return eventrouter.New(cache, eventSink, config, filter) } // CFClient creates a client object which can talk to Cloud Foundry