diff --git a/docs/api.md b/docs/api.md index 31ab782dc..7c4b369e4 100644 --- a/docs/api.md +++ b/docs/api.md @@ -13,10 +13,10 @@ Following is the supported API format for prometheus encode: agg_histogram: counts samples in configurable buckets, pre-aggregated via an Aggregate stage filter: an optional criterion to filter entries by. Deprecated: use filters instead. key: the key to match and filter by - value: the value to match and filter by + value: the value to match and filter by. Use !nil / nil to match presence / absence. Add multiple matching values using '|' rune such as 'a|b' to match either 'a' or 'b'. filters: a list of criteria to filter entries by key: the key to match and filter by - value: the value to match and filter by + value: the value to match and filter by. Use !nil / nil to match presence / absence. Add multiple matching values using '|' rune such as 'a|b' to match either 'a' or 'b'. valueKey: entry key from which to resolve metric value labels: labels to be associated with the metric buckets: histogram buckets diff --git a/pkg/api/encode_prom.go b/pkg/api/encode_prom.go index 3b1e4af68..77d189bb9 100644 --- a/pkg/api/encode_prom.go +++ b/pkg/api/encode_prom.go @@ -61,5 +61,5 @@ type PromMetricsItems []PromMetricsItem type PromMetricsFilter struct { Key string `yaml:"key" json:"key" doc:"the key to match and filter by"` - Value string `yaml:"value" json:"value" doc:"the value to match and filter by"` + Value string `yaml:"value" json:"value" doc:"the value to match and filter by. Use !nil / nil to match presence / absence. Add multiple matching values using '|' rune such as 'a|b' to match either 'a' or 'b'."` } diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 45c8d28d2..22e9e890d 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -27,6 +27,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/operational" putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" "github.com/netobserv/flowlogs-pipeline/pkg/utils" + "k8s.io/utils/strings/slices" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -205,14 +206,26 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetri func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *api.PromMetricsItem) interface{} { for _, filter := range info.GetFilters() { - if val, found := flow[filter.Key]; found { - sVal, ok := val.(string) - if !ok { - sVal = fmt.Sprint(val) + val, found := flow[filter.Key] + switch filter.Value { + case "nil": + if found { + return nil } - if sVal != filter.Value { + case "!nil": + if !found { return nil } + default: + if found { + sVal, ok := val.(string) + if !ok { + sVal = fmt.Sprint(val) + } + if !slices.Contains(strings.Split(filter.Value, "|"), sVal) { + return nil + } + } } } if info.ValueKey == "" { diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 9840eda30..0555991f5 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -279,6 +279,107 @@ func Test_FilterDuplicates(t *testing.T) { require.Contains(t, exposed, `bytes_filtered 8`) } +func Test_FilterNotNil(t *testing.T) { + metrics := []config.GenericMap{ + { + "srcIP": "20.0.0.2", + "dstIP": "10.0.0.1", + "latency": 0.1, + }, + { + "srcIP": "20.0.0.2", + "dstIP": "10.0.0.1", + "latency": 0.9, + }, + { + "srcIP": "20.0.0.2", + "dstIP": "10.0.0.1", + }, + } + params := api.PromEncode{ + Prefix: "test_", + ExpiryTime: api.Duration{ + Duration: time.Duration(60 * time.Second), + }, + Metrics: []api.PromMetricsItem{ + { + Name: "latencies", + Type: "histogram", + ValueKey: "latency", + Filters: []api.PromMetricsFilter{{Key: "latency", Value: "!nil"}}, + }, + }, + } + + encodeProm, err := initProm(¶ms) + require.NoError(t, err) + for _, metric := range metrics { + encodeProm.Encode(metric) + } + time.Sleep(100 * time.Millisecond) + + exposed := test.ReadExposedMetrics(t) + + require.Contains(t, exposed, `test_latencies_sum 1`) + require.Contains(t, exposed, `test_latencies_count 2`) +} + +func Test_FilterDirection(t *testing.T) { + metrics := []config.GenericMap{ + { + "dir": 0, // ingress + "packets": 10, + }, + { + "dir": 1, // egress + "packets": 100, + }, + { + "dir": 2, // inner + "packets": 1000, + }, + } + params := api.PromEncode{ + Prefix: "test_", + ExpiryTime: api.Duration{ + Duration: time.Duration(60 * time.Second), + }, + Metrics: []api.PromMetricsItem{ + { + Name: "ingress_packets_total", + Type: "counter", + ValueKey: "packets", + Filters: []api.PromMetricsFilter{{Key: "dir", Value: "0"}}, + }, + { + Name: "egress_packets_total", + Type: "counter", + ValueKey: "packets", + Filters: []api.PromMetricsFilter{{Key: "dir", Value: "1"}}, + }, + { + Name: "ingress_or_inner_packets_total", + Type: "counter", + ValueKey: "packets", + Filters: []api.PromMetricsFilter{{Key: "dir", Value: "0|2"}}, + }, + }, + } + + encodeProm, err := initProm(¶ms) + require.NoError(t, err) + for _, metric := range metrics { + encodeProm.Encode(metric) + } + time.Sleep(100 * time.Millisecond) + + exposed := test.ReadExposedMetrics(t) + + require.Contains(t, exposed, `test_ingress_packets_total 10`) + require.Contains(t, exposed, `test_egress_packets_total 100`) + require.Contains(t, exposed, `test_ingress_or_inner_packets_total 1010`) +} + func Test_MetricTTL(t *testing.T) { metrics := []config.GenericMap{{ "srcIP": "20.0.0.2",