Skip to content

Commit

Permalink
NETOBSERV-1935: enable metrics from list/nested fields (#739)
Browse files Browse the repository at this point in the history
* NETOBSERV-1935: enable metrics from list/nested fields

This makes it possible to generate metrics with labels/filters set on
list fields (e.g. interfaces) and nested fields (e.g. soon-coming
structured network events)

In metrics API, user needs to configure explicitly which list field needs to
be "flattened", in order to be consumable as filters/labels.

Nested fields can be consumed as filters/labels with the ">" character;

E.g: `flatten: [networkEvents], filters: [{key: "networkEvents>type", value: "acl"}], labels: [networkEvents>name]`
This is a sample config to filter a metric for ACL events and label it by name

* linter
  • Loading branch information
jotak authored Nov 14, 2024
1 parent 5bb2146 commit 5fba26a
Show file tree
Hide file tree
Showing 15 changed files with 569 additions and 118 deletions.
2 changes: 2 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Following is the supported API format for prometheus encode:
valueKey: entry key from which to resolve metric value
labels: labels to be associated with the metric
remap: optional remapping of labels
flatten: list fields to be flattened
buckets: histogram buckets
valueScale: scale factor of the value (MetricVal := FlowVal / Scale)
prefix: prefix added to each metric name
Expand Down Expand Up @@ -444,6 +445,7 @@ Following is the supported API format for writing metrics to an OpenTelemetry co
valueKey: entry key from which to resolve metric value
labels: labels to be associated with the metric
remap: optional remapping of labels
flatten: list fields to be flattened
buckets: histogram buckets
valueScale: scale factor of the value (MetricVal := FlowVal / Scale)
pushTimeInterval: how often should metrics be sent to collector:
Expand Down
1 change: 1 addition & 0 deletions pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type MetricsItem struct {
ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"`
Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"`
Remap map[string]string `yaml:"remap" json:"remap" doc:"optional remapping of labels"`
Flatten []string `yaml:"flatten" json:"flatten" doc:"list fields to be flattened"`
Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"`
ValueScale float64 `yaml:"valueScale,omitempty" json:"valueScale,omitempty" doc:"scale factor of the value (MetricVal := FlowVal / Scale)"`
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/confgen/confgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func Test_RunShortConfGen(t *testing.T) {
ValueKey: "test_aggregates_value",
Labels: []string{"groupByKeys", "aggregate"},
Remap: map[string]string{},
Flatten: []string{},
Buckets: []float64{},
}},
}, out.Parameters[3].Encode.Prom)
Expand Down Expand Up @@ -234,6 +235,7 @@ func Test_RunConfGenNoAgg(t *testing.T) {
ValueKey: "Bytes",
Labels: []string{"service"},
Remap: map[string]string{},
Flatten: []string{},
Buckets: []float64{},
}},
}, out.Parameters[2].Encode.Prom)
Expand Down Expand Up @@ -339,6 +341,7 @@ func Test_RunLongConfGen(t *testing.T) {
ValueKey: "test_aggregates_value",
Labels: []string{"groupByKeys", "aggregate"},
Remap: map[string]string{},
Flatten: []string{},
Buckets: []float64{},
}, {
Name: "test_histo",
Expand All @@ -347,6 +350,7 @@ func Test_RunLongConfGen(t *testing.T) {
ValueKey: "test_aggregates_value",
Labels: []string{"groupByKeys", "aggregate"},
Remap: map[string]string{},
Flatten: []string{},
Buckets: []float64{},
}},
}, out.Parameters[4].Encode.Prom)
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func TestKafkaPromPipeline(t *testing.T) {
ValueKey: "recent_count",
Labels: []string{"by", "aggregate"},
Remap: map[string]string{},
Flatten: []string{},
Buckets: []float64{},
}},
Prefix: "flp_",
Expand Down Expand Up @@ -171,7 +172,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[4])
require.NoError(t, err)
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"remap":{},"buckets":[]}],"prefix":"flp_"}}}`, string(b))
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"flatten":[],"remap":{},"buckets":[]}],"prefix":"flp_"}}}`, string(b))
}

func TestForkPipeline(t *testing.T) {
Expand Down
15 changes: 8 additions & 7 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode/metrics"
promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -114,25 +115,25 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) {
cleanupFunc.(func())()
}

func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
func (e *EncodeProm) addCounter(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
e.metricCommon.AddCounter(fullMetricName, counter, mInfo)
return counter
}

func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
func (e *EncodeProm) addGauge(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
e.metricCommon.AddGauge(fullMetricName, gauge, mInfo)
return gauge
}

func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
e.metricCommon.AddHist(fullMetricName, histogram, mInfo)
return histogram
}

func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector {
func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels())
e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo)
return agghistogram
Expand Down Expand Up @@ -176,10 +177,10 @@ func (e *EncodeProm) cleanDeletedMetrics(newCfg api.PromEncode) {
}

// returns true if a registry restart is needed
func (e *EncodeProm) checkMetricUpdate(prefix string, apiItem *api.MetricsItem, store map[string]mInfoStruct, createMetric func(string, *MetricInfo) prometheus.Collector) bool {
func (e *EncodeProm) checkMetricUpdate(prefix string, apiItem *api.MetricsItem, store map[string]mInfoStruct, createMetric func(string, *metrics.Preprocessed) prometheus.Collector) bool {
fullMetricName := prefix + apiItem.Name
plog.Debugf("Checking metric: %s", fullMetricName)
mInfo := CreateMetricInfo(apiItem)
mInfo := metrics.Preprocess(apiItem)
if oldMetric, ok := store[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.TargetLabels(), oldMetric.info.TargetLabels()) {
plog.Debug("Changes detected in labels")
Expand Down Expand Up @@ -257,7 +258,7 @@ func (e *EncodeProm) resetRegistry() {
for i := range e.cfg.Metrics {
mCfg := &e.cfg.Metrics[i]
fullMetricName := e.cfg.Prefix + mCfg.Name
mInfo := CreateMetricInfo(mCfg)
mInfo := metrics.Preprocess(mCfg)
plog.Debugf("Create metric: %s, Labels: %v", fullMetricName, mInfo.TargetLabels())
var m prometheus.Collector
switch mCfg.Type {
Expand Down
213 changes: 198 additions & 15 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,6 @@ func Test_MissingLabels(t *testing.T) {
},
}
params := api.PromEncode{
ExpiryTime: api.Duration{
Duration: time.Duration(60 * time.Second),
},
Metrics: []api.MetricsItem{
{
Name: "my_counter",
Expand Down Expand Up @@ -588,9 +585,6 @@ func Test_Remap(t *testing.T) {
},
}
params := api.PromEncode{
ExpiryTime: api.Duration{
Duration: time.Duration(60 * time.Second),
},
Metrics: []api.MetricsItem{
{
Name: "my_counter",
Expand All @@ -616,6 +610,204 @@ func Test_Remap(t *testing.T) {
require.Contains(t, exposed, `my_counter{ip="10.0.0.3",namespace="B"} 4`)
}

func Test_WithListField(t *testing.T) {
metrics := []config.GenericMap{
{
"namespace": "A",
"interfaces": []string{"eth0", "123456"},
"bytes": 7,
},
{
"namespace": "A",
"interfaces": []any{"eth0", "abcdef"},
"bytes": 1,
},
{
"namespace": "A",
"interfaces": []any{"eth0", "xyz"},
"bytes": 10,
},
{
"namespace": "B",
"bytes": 2,
},
{
"namespace": "C",
"interfaces": []string{},
"bytes": 3,
},
}
params := api.PromEncode{
Metrics: []api.MetricsItem{
{
Name: "my_counter",
Type: "counter",
ValueKey: "bytes",
Filters: []api.MetricsFilter{
{Key: "interfaces", Value: "xyz", Type: api.MetricFilterNotEqual},
},
Flatten: []string{"interfaces"},
Labels: []string{"namespace", "interfaces"},
Remap: map[string]string{"interfaces": "interface"},
},
},
}

encodeProm, err := initProm(&params)
require.NoError(t, err)
for _, metric := range metrics {
encodeProm.Encode(metric)
}
time.Sleep(100 * time.Millisecond)

exposed := test.ReadExposedMetrics(t, encodeProm.server)

require.Contains(t, exposed, `my_counter{interface="eth0",namespace="A"} 18`)
require.Contains(t, exposed, `my_counter{interface="123456",namespace="A"} 7`)
require.Contains(t, exposed, `my_counter{interface="abcdef",namespace="A"} 1`)
require.Contains(t, exposed, `my_counter{interface="",namespace="B"} 2`)
require.Contains(t, exposed, `my_counter{interface="",namespace="C"} 3`)
require.NotContains(t, exposed, `"xyz"`)
}

func Test_WithObjectListField(t *testing.T) {
metrics := []config.GenericMap{
{
"namespace": "A",
"events": []any{
config.GenericMap{"type": "acl", "name": "my_policy"},
},
"bytes": 1,
},
{
"namespace": "A",
"events": []any{
config.GenericMap{"type": "egress", "name": "my_egress"},
config.GenericMap{"type": "acl", "name": "my_policy"},
},
"bytes": 10,
},
{
"namespace": "B",
"bytes": 2,
},
{
"namespace": "C",
"events": []string{},
"bytes": 3,
},
}
params := api.PromEncode{
Metrics: []api.MetricsItem{
{
Name: "policy_counter",
Type: "counter",
ValueKey: "bytes",
Filters: []api.MetricsFilter{
{Key: "events>type", Value: "acl", Type: api.MetricFilterEqual},
},
Labels: []string{"namespace", "events>name"},
Flatten: []string{"events"},
Remap: map[string]string{"events>name": "name"},
},
},
}

encodeProm, err := initProm(&params)
require.NoError(t, err)
for _, metric := range metrics {
encodeProm.Encode(metric)
}
time.Sleep(100 * time.Millisecond)

exposed := test.ReadExposedMetrics(t, encodeProm.server)

require.Contains(t, exposed, `policy_counter{name="my_policy",namespace="A"} 11`)
require.NotContains(t, exposed, `"my_egress"`)
require.NotContains(t, exposed, `"B"`)
require.NotContains(t, exposed, `"C"`)
}

func Test_WithObjectListField_bis(t *testing.T) {
metrics := []config.GenericMap{
{
"namespace": "A",
"events": []any{
config.GenericMap{"type": "egress", "name": "my_egress"},
config.GenericMap{"type": "acl", "name": "my_policy"},
},
"bytes": 10,
},
}
params := api.PromEncode{
Metrics: []api.MetricsItem{
{
Name: "policy_counter",
Type: "counter",
ValueKey: "bytes",
Filters: []api.MetricsFilter{
{Key: "events>type", Value: "acl", Type: api.MetricFilterEqual},
},
Flatten: []string{"events"},
Labels: []string{"namespace"},
},
},
}

encodeProm, err := initProm(&params)
require.NoError(t, err)
for _, metric := range metrics {
encodeProm.Encode(metric)
}
time.Sleep(100 * time.Millisecond)

exposed := test.ReadExposedMetrics(t, encodeProm.server)

require.Contains(t, exposed, `policy_counter{namespace="A"} 10`)
require.NotContains(t, exposed, `"my_egress"`)
require.NotContains(t, exposed, `"B"`)
require.NotContains(t, exposed, `"C"`)
}

func Test_WithObjectListField_ter(t *testing.T) {
metrics := []config.GenericMap{
{
"namespace": "A",
"events": []any{
config.GenericMap{"type": "egress", "name": "my_egress"},
config.GenericMap{"type": "acl", "name": "my_policy"},
},
"bytes": 10,
},
}
params := api.PromEncode{
Metrics: []api.MetricsItem{
{
Name: "policy_counter",
Type: "counter",
ValueKey: "bytes",
Labels: []string{"namespace", "events>name"},
Flatten: []string{"events"},
Remap: map[string]string{"events>name": "name"},
},
},
}

encodeProm, err := initProm(&params)
require.NoError(t, err)
for _, metric := range metrics {
encodeProm.Encode(metric)
}
time.Sleep(100 * time.Millisecond)

exposed := test.ReadExposedMetrics(t, encodeProm.server)

require.Contains(t, exposed, `policy_counter{name="my_policy",namespace="A"} 10`)
require.Contains(t, exposed, `policy_counter{name="my_egress",namespace="A"} 10`)
require.NotContains(t, exposed, `"B"`)
require.NotContains(t, exposed, `"C"`)
}

func buildFlow() config.GenericMap {
return config.GenericMap{
"srcIP": "10.0.0." + strconv.Itoa(rand.Intn(20)),
Expand Down Expand Up @@ -754,12 +946,3 @@ func Test_MultipleProm(t *testing.T) {

// TODO: Add test for different addresses, but need to deal with StartPromServer (ListenAndServe)
}

func Test_Filters_extractVarLookups(t *testing.T) {
variables := extractVarLookups("$(abc)--$(def)")

require.Equal(t, [][]string{{"$(abc)", "abc"}, {"$(def)", "def"}}, variables)

variables = extractVarLookups("")
require.Empty(t, variables)
}
28 changes: 28 additions & 0 deletions pkg/pipeline/encode/metrics/filtering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package metrics

import "github.com/netobserv/flowlogs-pipeline/pkg/config"

func (p *Preprocessed) ApplyFilters(flow config.GenericMap, flatParts []config.GenericMap) (bool, []config.GenericMap) {
filteredParts := flatParts
for _, filter := range p.filters {
if filter.useFlat {
filteredParts = filter.filterFlatParts(filteredParts)
if len(filteredParts) == 0 {
return false, nil
}
} else if !filter.predicate(flow) {
return false, nil
}
}
return true, filteredParts
}

func (pf *preprocessedFilter) filterFlatParts(flatParts []config.GenericMap) []config.GenericMap {
var filteredParts []config.GenericMap
for _, part := range flatParts {
if pf.predicate(part) {
filteredParts = append(filteredParts, part)
}
}
return filteredParts
}
Loading

0 comments on commit 5fba26a

Please sign in to comment.