Skip to content

Commit

Permalink
NETOBSERV-1692: allow KEEP filtering logic (#740)
Browse files Browse the repository at this point in the history
* NETOBSERV-1692: allow KEEP filtering logic

* rename keep_entry_all_satisfied

* Share filter predicate code (prom+transform)

Move prom-encode predicates filtering code to its own package and share
it with "keep_entry" transforms

* fix merge conflicts
  • Loading branch information
jotak authored Nov 26, 2024
1 parent 671eb8e commit 0fa789c
Show file tree
Hide file tree
Showing 9 changed files with 534 additions and 117 deletions.
14 changes: 14 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ Following is the supported API format for filter transformations:
remove_entry_if_equal: removes the entry if the field value equals specified value
remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
remove_entry_all_satisfied: removes the entry if all of the defined rules are satisfied
keep_entry_all_satisfied: keeps the entry if the set of rules are all satisfied
add_field: adds (input) field to the entry; overrides previous value if present (key=input, value=value)
add_field_if_doesnt_exist: adds a field to the entry if the field does not exist
add_field_if: add output field set to assignee if input field satisfies criteria from parameters field
Expand All @@ -187,6 +188,19 @@ Following is the supported API format for filter transformations:
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
keepEntryAllSatisfied: configuration for keep_entry rule
type: (enum) one of the following:
keep_entry_if_exists: keeps the entry if the field exists
keep_entry_if_doesnt_exist: keeps the entry if the field does not exist
keep_entry_if_equal: keeps the entry if the field value equals specified value
keep_entry_if_not_equal: keeps the entry if the field value does not equal specified value
keep_entry_if_regex_match: keeps the entry if the field value matches the specified regex
keep_entry_if_not_regex_match: keeps the entry if the field value does not match the specified regex
keepEntry: configuration for keep_entry_* rules
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
keepEntrySampling: sampling value for keep_entry type: 1 flow on <sampling> is kept
addField: configuration for add_field rule
input: entry input field
value: specified value of input field:
Expand Down
22 changes: 22 additions & 0 deletions pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
RemoveEntryIfEqual TransformFilterEnum = "remove_entry_if_equal" // removes the entry if the field value equals specified value
RemoveEntryIfNotEqual TransformFilterEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value
RemoveEntryAllSatisfied TransformFilterEnum = "remove_entry_all_satisfied" // removes the entry if all of the defined rules are satisfied
KeepEntryAllSatisfied TransformFilterEnum = "keep_entry_all_satisfied" // keeps the entry if the set of rules are all satisfied
AddField TransformFilterEnum = "add_field" // adds (input) field to the entry; overrides previous value if present (key=input, value=value)
AddFieldIfDoesntExist TransformFilterEnum = "add_field_if_doesnt_exist" // adds a field to the entry if the field does not exist
AddFieldIf TransformFilterEnum = "add_field_if" // add output field set to assignee if input field satisfies criteria from parameters field
Expand All @@ -55,11 +56,24 @@ const (
RemoveEntryIfNotEqualD TransformFilterRemoveEntryEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value
)

type TransformFilterKeepEntryEnum string

const (
KeepEntryIfExists TransformFilterKeepEntryEnum = "keep_entry_if_exists" // keeps the entry if the field exists
KeepEntryIfDoesntExist TransformFilterKeepEntryEnum = "keep_entry_if_doesnt_exist" // keeps the entry if the field does not exist
KeepEntryIfEqual TransformFilterKeepEntryEnum = "keep_entry_if_equal" // keeps the entry if the field value equals specified value
KeepEntryIfNotEqual TransformFilterKeepEntryEnum = "keep_entry_if_not_equal" // keeps the entry if the field value does not equal specified value
KeepEntryIfRegexMatch TransformFilterKeepEntryEnum = "keep_entry_if_regex_match" // keeps the entry if the field value matches the specified regex
KeepEntryIfNotRegexMatch TransformFilterKeepEntryEnum = "keep_entry_if_not_regex_match" // keeps the entry if the field value does not match the specified regex
)

type TransformFilterRule struct {
Type TransformFilterEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
RemoveField *TransformFilterGenericRule `yaml:"removeField,omitempty" json:"removeField,omitempty" doc:"configuration for remove_field rule"`
RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
RemoveEntryAllSatisfied []*RemoveEntryRule `yaml:"removeEntryAllSatisfied,omitempty" json:"removeEntryAllSatisfied,omitempty" doc:"configuration for remove_entry_all_satisfied rule"`
KeepEntryAllSatisfied []*KeepEntryRule `yaml:"keepEntryAllSatisfied,omitempty" json:"keepEntryAllSatisfied,omitempty" doc:"configuration for keep_entry rule"`
KeepEntrySampling uint16 `yaml:"keepEntrySampling,omitempty" json:"keepEntrySampling,omitempty" doc:"sampling value for keep_entry type: 1 flow on <sampling> is kept"`
AddField *TransformFilterGenericRule `yaml:"addField,omitempty" json:"addField,omitempty" doc:"configuration for add_field rule"`
AddFieldIfDoesntExist *TransformFilterGenericRule `yaml:"addFieldIfDoesntExist,omitempty" json:"addFieldIfDoesntExist,omitempty" doc:"configuration for add_field_if_doesnt_exist rule"`
AddFieldIf *TransformFilterRuleWithAssignee `yaml:"addFieldIf,omitempty" json:"addFieldIf,omitempty" doc:"configuration for add_field_if rule"`
Expand All @@ -79,6 +93,9 @@ func (r *TransformFilterRule) preprocess() {
for i := range r.RemoveEntryAllSatisfied {
r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess()
}
for i := range r.KeepEntryAllSatisfied {
r.KeepEntryAllSatisfied[i].KeepEntry.preprocess()
}
for i := range r.ConditionalSampling {
r.ConditionalSampling[i].preprocess()
}
Expand Down Expand Up @@ -110,6 +127,11 @@ type RemoveEntryRule struct {
RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
}

type KeepEntryRule struct {
Type TransformFilterKeepEntryEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
KeepEntry *TransformFilterGenericRule `yaml:"keepEntry,omitempty" json:"keepEntry,omitempty" doc:"configuration for keep_entry_* rules"`
}

type SamplingCondition struct {
Value uint16 `yaml:"value,omitempty" json:"value,omitempty" doc:"sampling value: 1 flow on <sampling> is kept"`
Rules []*RemoveEntryRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"rules to be satisfied for this sampling configuration"`
Expand Down
107 changes: 12 additions & 95 deletions pkg/pipeline/encode/metrics/preprocess.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package metrics

import (
"fmt"
"regexp"
"strings"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/utils/filters"
)

type Predicate func(config.GenericMap) bool

var variableExtractor = regexp.MustCompile(`\$\(([^\)]+)\)`)

type Preprocessed struct {
*api.MetricsItem
filters []preprocessedFilter
Expand All @@ -27,7 +21,7 @@ type MappedLabel struct {
}

type preprocessedFilter struct {
predicate Predicate
predicate filters.Predicate
useFlat bool
}

Expand All @@ -42,102 +36,25 @@ func (p *Preprocessed) TargetLabels() []string {
return targetLabels
}

func Presence(filter api.MetricsFilter) Predicate {
return func(flow config.GenericMap) bool {
_, found := flow[filter.Key]
return found
}
}

func Absence(filter api.MetricsFilter) Predicate {
pred := Presence(filter)
return func(flow config.GenericMap) bool { return !pred(flow) }
}

func Equal(filter api.MetricsFilter) Predicate {
varLookups := extractVarLookups(filter.Value)
return func(flow config.GenericMap) bool {
if val, found := flow[filter.Key]; found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
value := filter.Value
if len(varLookups) > 0 {
value = injectVars(flow, value, varLookups)
}
return sVal == value
}
return false
}
}

func NotEqual(filter api.MetricsFilter) Predicate {
pred := Equal(filter)
return func(flow config.GenericMap) bool { return !pred(flow) }
}

func Regex(filter api.MetricsFilter) Predicate {
r, _ := regexp.Compile(filter.Value)
return func(flow config.GenericMap) bool {
if val, found := flow[filter.Key]; found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
return r.MatchString(sVal)
}
return false
}
}

func NotRegex(filter api.MetricsFilter) Predicate {
pred := Regex(filter)
return func(flow config.GenericMap) bool { return !pred(flow) }
}

func filterToPredicate(filter api.MetricsFilter) Predicate {
func filterToPredicate(filter api.MetricsFilter) filters.Predicate {
switch filter.Type {
case api.MetricFilterEqual:
return Equal(filter)
return filters.Equal(filter.Key, filter.Value, true)
case api.MetricFilterNotEqual:
return NotEqual(filter)
return filters.NotEqual(filter.Key, filter.Value, true)
case api.MetricFilterPresence:
return Presence(filter)
return filters.Presence(filter.Key)
case api.MetricFilterAbsence:
return Absence(filter)
return filters.Absence(filter.Key)
case api.MetricFilterRegex:
return Regex(filter)
r, _ := regexp.Compile(filter.Value)
return filters.Regex(filter.Key, r)
case api.MetricFilterNotRegex:
return NotRegex(filter)
r, _ := regexp.Compile(filter.Value)
return filters.NotRegex(filter.Key, r)
}
// Default = Exact
return Equal(filter)
}

func extractVarLookups(value string) [][]string {
// Extract list of variables to lookup
// E.g: filter "$(SrcAddr):$(SrcPort)" would return [SrcAddr,SrcPort]
if len(value) > 0 {
return variableExtractor.FindAllStringSubmatch(value, -1)
}
return nil
}

func injectVars(flow config.GenericMap, filterValue string, varLookups [][]string) string {
injected := filterValue
for _, matchGroup := range varLookups {
var value string
if rawVal, found := flow[matchGroup[1]]; found {
if sVal, ok := rawVal.(string); ok {
value = sVal
} else {
value = utils.ConvertToString(rawVal)
}
}
injected = strings.ReplaceAll(injected, matchGroup[0], value)
}
return injected
return filters.Equal(filter.Key, filter.Value, true)
}

func Preprocess(def *api.MetricsItem) *Preprocessed {
Expand Down
9 changes: 0 additions & 9 deletions pkg/pipeline/encode/metrics/preprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_Filters_extractVarLookups(t *testing.T) {
variables := extractVarLookups("$(abc)--$(def)")

assert.Equal(t, [][]string{{"$(abc)", "abc"}, {"$(def)", "def"}}, variables)

variables = extractVarLookups("")
assert.Empty(t, variables)
}

func Test_Flatten(t *testing.T) {
pp := Preprocess(&api.MetricsItem{Flatten: []string{"interfaces", "events"}})
fl := pp.GenerateFlatParts(config.GenericMap{
Expand Down
63 changes: 59 additions & 4 deletions pkg/pipeline/transform/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/utils/filters"
"github.com/sirupsen/logrus"
)

Expand All @@ -37,14 +38,32 @@ var (
)

type Filter struct {
Rules []api.TransformFilterRule
Rules []api.TransformFilterRule
KeepRules []predicatesRule
}

type predicatesRule struct {
predicates []filters.Predicate
sampling uint16
}

// Transform transforms a flow; if false is returned as a second argument, the entry is dropped
func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) {
tlog.Tracef("f = %v", f)
outputEntry := entry.Copy()
labels := make(map[string]string)
if len(f.KeepRules) > 0 {
keep := false
for _, r := range f.KeepRules {
if applyPredicates(outputEntry, r) {
keep = true
break
}
}
if !keep {
return nil, false
}
}
for i := range f.Rules {
tlog.Tracef("rule = %v", f.Rules[i])
if cont := applyRule(outputEntry, labels, &f.Rules[i]); !cont {
Expand Down Expand Up @@ -143,6 +162,9 @@ func applyRule(entry config.GenericMap, labels map[string]string, rule *api.Tran
return !isRemoveEntrySatisfied(entry, rule.RemoveEntryAllSatisfied)
case api.ConditionalSampling:
return sample(entry, rule.ConditionalSampling)
case api.KeepEntryAllSatisfied:
// This should be processed only in "applyPredicates". Failure to do so is a bug.
tlog.Panicf("unexpected KeepEntryAllSatisfied: %v", rule)
default:
tlog.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule)
}
Expand All @@ -159,25 +181,58 @@ func isRemoveEntrySatisfied(entry config.GenericMap, rules []*api.RemoveEntryRul
return true
}

func applyPredicates(entry config.GenericMap, rule predicatesRule) bool {
if !rollSampling(rule.sampling) {
return false
}
for _, p := range rule.predicates {
if !p(entry) {
return false
}
}
return true
}

func sample(entry config.GenericMap, rules []*api.SamplingCondition) bool {
for _, r := range rules {
if isRemoveEntrySatisfied(entry, r.Rules) {
return r.Value == 0 || (rndgen.Intn(int(r.Value)) == 0)
return rollSampling(r.Value)
}
}
return true
}

func rollSampling(value uint16) bool {
return value == 0 || (rndgen.Intn(int(value)) == 0)
}

// NewTransformFilter create a new filter transform
func NewTransformFilter(params config.StageParam) (Transformer, error) {
tlog.Debugf("entering NewTransformFilter")
keepRules := []predicatesRule{}
rules := []api.TransformFilterRule{}
if params.Transform != nil && params.Transform.Filter != nil {
params.Transform.Filter.Preprocess()
rules = params.Transform.Filter.Rules
for i := range params.Transform.Filter.Rules {
baseRules := &params.Transform.Filter.Rules[i]
if baseRules.Type == api.KeepEntryAllSatisfied {
pr := predicatesRule{sampling: baseRules.KeepEntrySampling}
for _, keepRule := range baseRules.KeepEntryAllSatisfied {
pred, err := filters.FromKeepEntry(keepRule)
if err != nil {
return nil, err
}
pr.predicates = append(pr.predicates, pred)
}
keepRules = append(keepRules, pr)
} else {
rules = append(rules, *baseRules)
}
}
}
transformFilter := &Filter{
Rules: rules,
Rules: rules,
KeepRules: keepRules,
}
return transformFilter, nil
}
Loading

0 comments on commit 0fa789c

Please sign in to comment.