Skip to content

Commit

Permalink
NETOBSERV-1352 enhance prom filters for RTT metrics (#478) (#519)
Browse files Browse the repository at this point in the history
* enhance prom filters

* a|b filter match

Co-authored-by: Julien Pinsonneau <[email protected]>
  • Loading branch information
jotak and jpinsonneau authored Oct 12, 2023
1 parent 058bc4e commit d6f1e7d
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 8 deletions.
4 changes: 2 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ Following is the supported API format for prometheus encode:
agg_histogram: counts samples in configurable buckets, pre-aggregated via an Aggregate stage
filter: an optional criterion to filter entries by. Deprecated: use filters instead.
key: the key to match and filter by
value: the value to match and filter by
value: the value to match and filter by. Use !nil / nil to match presence / absence. Add multiple matching values using '|' rune such as 'a|b' to match either 'a' or 'b'.
filters: a list of criteria to filter entries by
key: the key to match and filter by
value: the value to match and filter by
value: the value to match and filter by. Use !nil / nil to match presence / absence. Add multiple matching values using '|' rune such as 'a|b' to match either 'a' or 'b'.
valueKey: entry key from which to resolve metric value
labels: labels to be associated with the metric
buckets: histogram buckets
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ type PromMetricsItems []PromMetricsItem

type PromMetricsFilter struct {
Key string `yaml:"key" json:"key" doc:"the key to match and filter by"`
Value string `yaml:"value" json:"value" doc:"the value to match and filter by"`
Value string `yaml:"value" json:"value" doc:"the value to match and filter by. Use !nil / nil to match presence / absence. Add multiple matching values using '|' rune such as 'a|b' to match either 'a' or 'b'."`
}
23 changes: 18 additions & 5 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
"k8s.io/utils/strings/slices"

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -205,14 +206,26 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetri

func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *api.PromMetricsItem) interface{} {
for _, filter := range info.GetFilters() {
if val, found := flow[filter.Key]; found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
val, found := flow[filter.Key]
switch filter.Value {
case "nil":
if found {
return nil
}
if sVal != filter.Value {
case "!nil":
if !found {
return nil
}
default:
if found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
if !slices.Contains(strings.Split(filter.Value, "|"), sVal) {
return nil
}
}
}
}
if info.ValueKey == "" {
Expand Down
101 changes: 101 additions & 0 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,107 @@ func Test_FilterDuplicates(t *testing.T) {
require.Contains(t, exposed, `bytes_filtered 8`)
}

func Test_FilterNotNil(t *testing.T) {
metrics := []config.GenericMap{
{
"srcIP": "20.0.0.2",
"dstIP": "10.0.0.1",
"latency": 0.1,
},
{
"srcIP": "20.0.0.2",
"dstIP": "10.0.0.1",
"latency": 0.9,
},
{
"srcIP": "20.0.0.2",
"dstIP": "10.0.0.1",
},
}
params := api.PromEncode{
Prefix: "test_",
ExpiryTime: api.Duration{
Duration: time.Duration(60 * time.Second),
},
Metrics: []api.PromMetricsItem{
{
Name: "latencies",
Type: "histogram",
ValueKey: "latency",
Filters: []api.PromMetricsFilter{{Key: "latency", Value: "!nil"}},
},
},
}

encodeProm, err := initProm(&params)
require.NoError(t, err)
for _, metric := range metrics {
encodeProm.Encode(metric)
}
time.Sleep(100 * time.Millisecond)

exposed := test.ReadExposedMetrics(t)

require.Contains(t, exposed, `test_latencies_sum 1`)
require.Contains(t, exposed, `test_latencies_count 2`)
}

func Test_FilterDirection(t *testing.T) {
metrics := []config.GenericMap{
{
"dir": 0, // ingress
"packets": 10,
},
{
"dir": 1, // egress
"packets": 100,
},
{
"dir": 2, // inner
"packets": 1000,
},
}
params := api.PromEncode{
Prefix: "test_",
ExpiryTime: api.Duration{
Duration: time.Duration(60 * time.Second),
},
Metrics: []api.PromMetricsItem{
{
Name: "ingress_packets_total",
Type: "counter",
ValueKey: "packets",
Filters: []api.PromMetricsFilter{{Key: "dir", Value: "0"}},
},
{
Name: "egress_packets_total",
Type: "counter",
ValueKey: "packets",
Filters: []api.PromMetricsFilter{{Key: "dir", Value: "1"}},
},
{
Name: "ingress_or_inner_packets_total",
Type: "counter",
ValueKey: "packets",
Filters: []api.PromMetricsFilter{{Key: "dir", Value: "0|2"}},
},
},
}

encodeProm, err := initProm(&params)
require.NoError(t, err)
for _, metric := range metrics {
encodeProm.Encode(metric)
}
time.Sleep(100 * time.Millisecond)

exposed := test.ReadExposedMetrics(t)

require.Contains(t, exposed, `test_ingress_packets_total 10`)
require.Contains(t, exposed, `test_egress_packets_total 100`)
require.Contains(t, exposed, `test_ingress_or_inner_packets_total 1010`)
}

func Test_MetricTTL(t *testing.T) {
metrics := []config.GenericMap{{
"srcIP": "20.0.0.2",
Expand Down

0 comments on commit d6f1e7d

Please sign in to comment.