From 5fba26acf751665927eea0d740b9f1d112d9f989 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Thu, 14 Nov 2024 09:11:55 +0100 Subject: [PATCH] NETOBSERV-1935: enable metrics from list/nested fields (#739) * NETOBSERV-1935: enable metrics from list/nested fields This makes it possible to generate metrics with labels/filters set on list fields (e.g. interfaces) and nested fields (e.g. soon-coming structured network events) In metrics API, user needs to configure explicitly which list field needs to be "flattened", in order to be consumable as filters/labels. Nested fields can be consumed as filters/labels with the ">" character; E.g: `flatten: [networkEvents], filters: [{key: "networkEvents>type", value: "acl"}], labels: [networkEvents>name]` This is a sample config to filter a metric for ACL events and label it by name * linter --- docs/api.md | 2 + pkg/api/encode_prom.go | 1 + pkg/confgen/confgen_test.go | 4 + pkg/config/pipeline_builder_test.go | 3 +- pkg/pipeline/encode/encode_prom.go | 15 +- pkg/pipeline/encode/encode_prom_test.go | 213 ++++++++++++++++-- pkg/pipeline/encode/metrics/filtering.go | 28 +++ pkg/pipeline/encode/metrics/flattening.go | 88 ++++++++ .../preprocess.go} | 56 +++-- .../encode/metrics/preprocess_test.go | 53 +++++ pkg/pipeline/encode/metrics_common.go | 209 +++++++++++------ .../opentelemetry/encode_otlpmetrics.go | 3 +- pkg/pipeline/extract/aggregate/aggregate.go | 2 +- pkg/pipeline/ingest/ingest_grpc.go | 7 +- pkg/pipeline/write/write_loki.go | 3 +- 15 files changed, 569 insertions(+), 118 deletions(-) create mode 100644 pkg/pipeline/encode/metrics/filtering.go create mode 100644 pkg/pipeline/encode/metrics/flattening.go rename pkg/pipeline/encode/{encode_prom_metric.go => metrics/preprocess.go} (71%) create mode 100644 pkg/pipeline/encode/metrics/preprocess_test.go diff --git a/docs/api.md b/docs/api.md index 19fb3f54a..ccecab368 100644 --- a/docs/api.md +++ b/docs/api.md @@ -30,6 +30,7 @@ Following is the supported API format for prometheus encode: valueKey: entry key from which to resolve metric value labels: labels to be associated with the metric remap: optional remapping of labels + flatten: list fields to be flattened buckets: histogram buckets valueScale: scale factor of the value (MetricVal := FlowVal / Scale) prefix: prefix added to each metric name @@ -444,6 +445,7 @@ Following is the supported API format for writing metrics to an OpenTelemetry co valueKey: entry key from which to resolve metric value labels: labels to be associated with the metric remap: optional remapping of labels + flatten: list fields to be flattened buckets: histogram buckets valueScale: scale factor of the value (MetricVal := FlowVal / Scale) pushTimeInterval: how often should metrics be sent to collector: diff --git a/pkg/api/encode_prom.go b/pkg/api/encode_prom.go index 97222b9f8..6780a43a2 100644 --- a/pkg/api/encode_prom.go +++ b/pkg/api/encode_prom.go @@ -53,6 +53,7 @@ type MetricsItem struct { ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"` Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"` Remap map[string]string `yaml:"remap" json:"remap" doc:"optional remapping of labels"` + Flatten []string `yaml:"flatten" json:"flatten" doc:"list fields to be flattened"` Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"` ValueScale float64 `yaml:"valueScale,omitempty" json:"valueScale,omitempty" doc:"scale factor of the value (MetricVal := FlowVal / Scale)"` } diff --git a/pkg/confgen/confgen_test.go b/pkg/confgen/confgen_test.go index 0c1762cf5..a1ee1ba4a 100644 --- a/pkg/confgen/confgen_test.go +++ b/pkg/confgen/confgen_test.go @@ -152,6 +152,7 @@ func Test_RunShortConfGen(t *testing.T) { ValueKey: "test_aggregates_value", Labels: []string{"groupByKeys", "aggregate"}, Remap: map[string]string{}, + Flatten: []string{}, Buckets: []float64{}, }}, }, out.Parameters[3].Encode.Prom) @@ -234,6 +235,7 @@ func Test_RunConfGenNoAgg(t *testing.T) { ValueKey: "Bytes", Labels: []string{"service"}, Remap: map[string]string{}, + Flatten: []string{}, Buckets: []float64{}, }}, }, out.Parameters[2].Encode.Prom) @@ -339,6 +341,7 @@ func Test_RunLongConfGen(t *testing.T) { ValueKey: "test_aggregates_value", Labels: []string{"groupByKeys", "aggregate"}, Remap: map[string]string{}, + Flatten: []string{}, Buckets: []float64{}, }, { Name: "test_histo", @@ -347,6 +350,7 @@ func Test_RunLongConfGen(t *testing.T) { ValueKey: "test_aggregates_value", Labels: []string{"groupByKeys", "aggregate"}, Remap: map[string]string{}, + Flatten: []string{}, Buckets: []float64{}, }}, }, out.Parameters[4].Encode.Prom) diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index 2f8d8d357..deb1a77bc 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -138,6 +138,7 @@ func TestKafkaPromPipeline(t *testing.T) { ValueKey: "recent_count", Labels: []string{"by", "aggregate"}, Remap: map[string]string{}, + Flatten: []string{}, Buckets: []float64{}, }}, Prefix: "flp_", @@ -171,7 +172,7 @@ func TestKafkaPromPipeline(t *testing.T) { b, err = json.Marshal(params[4]) require.NoError(t, err) - require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"remap":{},"buckets":[]}],"prefix":"flp_"}}}`, string(b)) + require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"flatten":[],"remap":{},"buckets":[]}],"prefix":"flp_"}}}`, string(b)) } func TestForkPipeline(t *testing.T) { diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 6a5be93d4..dbf974991 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -25,6 +25,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode/metrics" promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" @@ -114,25 +115,25 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) { cleanupFunc.(func())() } -func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) prometheus.Collector { +func (e *EncodeProm) addCounter(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector { counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels()) e.metricCommon.AddCounter(fullMetricName, counter, mInfo) return counter } -func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) prometheus.Collector { +func (e *EncodeProm) addGauge(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector { gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels()) e.metricCommon.AddGauge(fullMetricName, gauge, mInfo) return gauge } -func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector { +func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector { histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels()) e.metricCommon.AddHist(fullMetricName, histogram, mInfo) return histogram } -func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector { +func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector { agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.TargetLabels()) e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo) return agghistogram @@ -176,10 +177,10 @@ func (e *EncodeProm) cleanDeletedMetrics(newCfg api.PromEncode) { } // returns true if a registry restart is needed -func (e *EncodeProm) checkMetricUpdate(prefix string, apiItem *api.MetricsItem, store map[string]mInfoStruct, createMetric func(string, *MetricInfo) prometheus.Collector) bool { +func (e *EncodeProm) checkMetricUpdate(prefix string, apiItem *api.MetricsItem, store map[string]mInfoStruct, createMetric func(string, *metrics.Preprocessed) prometheus.Collector) bool { fullMetricName := prefix + apiItem.Name plog.Debugf("Checking metric: %s", fullMetricName) - mInfo := CreateMetricInfo(apiItem) + mInfo := metrics.Preprocess(apiItem) if oldMetric, ok := store[fullMetricName]; ok { if !reflect.DeepEqual(mInfo.TargetLabels(), oldMetric.info.TargetLabels()) { plog.Debug("Changes detected in labels") @@ -257,7 +258,7 @@ func (e *EncodeProm) resetRegistry() { for i := range e.cfg.Metrics { mCfg := &e.cfg.Metrics[i] fullMetricName := e.cfg.Prefix + mCfg.Name - mInfo := CreateMetricInfo(mCfg) + mInfo := metrics.Preprocess(mCfg) plog.Debugf("Create metric: %s, Labels: %v", fullMetricName, mInfo.TargetLabels()) var m prometheus.Collector switch mCfg.Type { diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 91266d619..8c940f0cd 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -543,9 +543,6 @@ func Test_MissingLabels(t *testing.T) { }, } params := api.PromEncode{ - ExpiryTime: api.Duration{ - Duration: time.Duration(60 * time.Second), - }, Metrics: []api.MetricsItem{ { Name: "my_counter", @@ -588,9 +585,6 @@ func Test_Remap(t *testing.T) { }, } params := api.PromEncode{ - ExpiryTime: api.Duration{ - Duration: time.Duration(60 * time.Second), - }, Metrics: []api.MetricsItem{ { Name: "my_counter", @@ -616,6 +610,204 @@ func Test_Remap(t *testing.T) { require.Contains(t, exposed, `my_counter{ip="10.0.0.3",namespace="B"} 4`) } +func Test_WithListField(t *testing.T) { + metrics := []config.GenericMap{ + { + "namespace": "A", + "interfaces": []string{"eth0", "123456"}, + "bytes": 7, + }, + { + "namespace": "A", + "interfaces": []any{"eth0", "abcdef"}, + "bytes": 1, + }, + { + "namespace": "A", + "interfaces": []any{"eth0", "xyz"}, + "bytes": 10, + }, + { + "namespace": "B", + "bytes": 2, + }, + { + "namespace": "C", + "interfaces": []string{}, + "bytes": 3, + }, + } + params := api.PromEncode{ + Metrics: []api.MetricsItem{ + { + Name: "my_counter", + Type: "counter", + ValueKey: "bytes", + Filters: []api.MetricsFilter{ + {Key: "interfaces", Value: "xyz", Type: api.MetricFilterNotEqual}, + }, + Flatten: []string{"interfaces"}, + Labels: []string{"namespace", "interfaces"}, + Remap: map[string]string{"interfaces": "interface"}, + }, + }, + } + + encodeProm, err := initProm(¶ms) + require.NoError(t, err) + for _, metric := range metrics { + encodeProm.Encode(metric) + } + time.Sleep(100 * time.Millisecond) + + exposed := test.ReadExposedMetrics(t, encodeProm.server) + + require.Contains(t, exposed, `my_counter{interface="eth0",namespace="A"} 18`) + require.Contains(t, exposed, `my_counter{interface="123456",namespace="A"} 7`) + require.Contains(t, exposed, `my_counter{interface="abcdef",namespace="A"} 1`) + require.Contains(t, exposed, `my_counter{interface="",namespace="B"} 2`) + require.Contains(t, exposed, `my_counter{interface="",namespace="C"} 3`) + require.NotContains(t, exposed, `"xyz"`) +} + +func Test_WithObjectListField(t *testing.T) { + metrics := []config.GenericMap{ + { + "namespace": "A", + "events": []any{ + config.GenericMap{"type": "acl", "name": "my_policy"}, + }, + "bytes": 1, + }, + { + "namespace": "A", + "events": []any{ + config.GenericMap{"type": "egress", "name": "my_egress"}, + config.GenericMap{"type": "acl", "name": "my_policy"}, + }, + "bytes": 10, + }, + { + "namespace": "B", + "bytes": 2, + }, + { + "namespace": "C", + "events": []string{}, + "bytes": 3, + }, + } + params := api.PromEncode{ + Metrics: []api.MetricsItem{ + { + Name: "policy_counter", + Type: "counter", + ValueKey: "bytes", + Filters: []api.MetricsFilter{ + {Key: "events>type", Value: "acl", Type: api.MetricFilterEqual}, + }, + Labels: []string{"namespace", "events>name"}, + Flatten: []string{"events"}, + Remap: map[string]string{"events>name": "name"}, + }, + }, + } + + encodeProm, err := initProm(¶ms) + require.NoError(t, err) + for _, metric := range metrics { + encodeProm.Encode(metric) + } + time.Sleep(100 * time.Millisecond) + + exposed := test.ReadExposedMetrics(t, encodeProm.server) + + require.Contains(t, exposed, `policy_counter{name="my_policy",namespace="A"} 11`) + require.NotContains(t, exposed, `"my_egress"`) + require.NotContains(t, exposed, `"B"`) + require.NotContains(t, exposed, `"C"`) +} + +func Test_WithObjectListField_bis(t *testing.T) { + metrics := []config.GenericMap{ + { + "namespace": "A", + "events": []any{ + config.GenericMap{"type": "egress", "name": "my_egress"}, + config.GenericMap{"type": "acl", "name": "my_policy"}, + }, + "bytes": 10, + }, + } + params := api.PromEncode{ + Metrics: []api.MetricsItem{ + { + Name: "policy_counter", + Type: "counter", + ValueKey: "bytes", + Filters: []api.MetricsFilter{ + {Key: "events>type", Value: "acl", Type: api.MetricFilterEqual}, + }, + Flatten: []string{"events"}, + Labels: []string{"namespace"}, + }, + }, + } + + encodeProm, err := initProm(¶ms) + require.NoError(t, err) + for _, metric := range metrics { + encodeProm.Encode(metric) + } + time.Sleep(100 * time.Millisecond) + + exposed := test.ReadExposedMetrics(t, encodeProm.server) + + require.Contains(t, exposed, `policy_counter{namespace="A"} 10`) + require.NotContains(t, exposed, `"my_egress"`) + require.NotContains(t, exposed, `"B"`) + require.NotContains(t, exposed, `"C"`) +} + +func Test_WithObjectListField_ter(t *testing.T) { + metrics := []config.GenericMap{ + { + "namespace": "A", + "events": []any{ + config.GenericMap{"type": "egress", "name": "my_egress"}, + config.GenericMap{"type": "acl", "name": "my_policy"}, + }, + "bytes": 10, + }, + } + params := api.PromEncode{ + Metrics: []api.MetricsItem{ + { + Name: "policy_counter", + Type: "counter", + ValueKey: "bytes", + Labels: []string{"namespace", "events>name"}, + Flatten: []string{"events"}, + Remap: map[string]string{"events>name": "name"}, + }, + }, + } + + encodeProm, err := initProm(¶ms) + require.NoError(t, err) + for _, metric := range metrics { + encodeProm.Encode(metric) + } + time.Sleep(100 * time.Millisecond) + + exposed := test.ReadExposedMetrics(t, encodeProm.server) + + require.Contains(t, exposed, `policy_counter{name="my_policy",namespace="A"} 10`) + require.Contains(t, exposed, `policy_counter{name="my_egress",namespace="A"} 10`) + require.NotContains(t, exposed, `"B"`) + require.NotContains(t, exposed, `"C"`) +} + func buildFlow() config.GenericMap { return config.GenericMap{ "srcIP": "10.0.0." + strconv.Itoa(rand.Intn(20)), @@ -754,12 +946,3 @@ func Test_MultipleProm(t *testing.T) { // TODO: Add test for different addresses, but need to deal with StartPromServer (ListenAndServe) } - -func Test_Filters_extractVarLookups(t *testing.T) { - variables := extractVarLookups("$(abc)--$(def)") - - require.Equal(t, [][]string{{"$(abc)", "abc"}, {"$(def)", "def"}}, variables) - - variables = extractVarLookups("") - require.Empty(t, variables) -} diff --git a/pkg/pipeline/encode/metrics/filtering.go b/pkg/pipeline/encode/metrics/filtering.go new file mode 100644 index 000000000..885d4ae2a --- /dev/null +++ b/pkg/pipeline/encode/metrics/filtering.go @@ -0,0 +1,28 @@ +package metrics + +import "github.com/netobserv/flowlogs-pipeline/pkg/config" + +func (p *Preprocessed) ApplyFilters(flow config.GenericMap, flatParts []config.GenericMap) (bool, []config.GenericMap) { + filteredParts := flatParts + for _, filter := range p.filters { + if filter.useFlat { + filteredParts = filter.filterFlatParts(filteredParts) + if len(filteredParts) == 0 { + return false, nil + } + } else if !filter.predicate(flow) { + return false, nil + } + } + return true, filteredParts +} + +func (pf *preprocessedFilter) filterFlatParts(flatParts []config.GenericMap) []config.GenericMap { + var filteredParts []config.GenericMap + for _, part := range flatParts { + if pf.predicate(part) { + filteredParts = append(filteredParts, part) + } + } + return filteredParts +} diff --git a/pkg/pipeline/encode/metrics/flattening.go b/pkg/pipeline/encode/metrics/flattening.go new file mode 100644 index 000000000..e9f27ebbd --- /dev/null +++ b/pkg/pipeline/encode/metrics/flattening.go @@ -0,0 +1,88 @@ +package metrics + +import ( + "github.com/netobserv/flowlogs-pipeline/pkg/config" +) + +func (p *Preprocessed) GenerateFlatParts(flow config.GenericMap) []config.GenericMap { + if len(p.MetricsItem.Flatten) == 0 { + return nil + } + // Want to generate sub-flows from {A=foo, B=[{B1=x, B2=y},{B1=z}], C=[foo,bar]} + // => {B>B1=x, B>B2=y, C=foo}, {B>B1=z, C=foo}, {B>B1=x, B>B2=y, C=bar}, {B>B1=z, C=bar} + var partsPerLabel [][]config.GenericMap + for _, fl := range p.MetricsItem.Flatten { + if anyVal, ok := flow[fl]; ok { + // Intermediate step to get: + // [{B>B1=x, B>B2=y}, {B>B1=z}], [C=foo, C=bar] + var partsForLabel []config.GenericMap + switch v := anyVal.(type) { + case []any: + prefix := fl + ">" + for _, vv := range v { + switch vvv := vv.(type) { + case config.GenericMap: + partsForLabel = append(partsForLabel, flattenNested(prefix, vvv)) + default: + partsForLabel = append(partsForLabel, config.GenericMap{fl: vv}) + } + } + case []config.GenericMap: + prefix := fl + ">" + for _, vv := range v { + partsForLabel = append(partsForLabel, flattenNested(prefix, vv)) + } + case []string: + for _, vv := range v { + partsForLabel = append(partsForLabel, config.GenericMap{fl: vv}) + } + } + if len(partsForLabel) > 0 { + partsPerLabel = append(partsPerLabel, partsForLabel) + } + } + } + return distribute(partsPerLabel) +} + +func distribute(allUnflat [][]config.GenericMap) []config.GenericMap { + // turn + // [{B>B1=x, B>B2=y}, {B>B1=z}], [{C=foo}, {C=bar}] + // into + // [{B>B1=x, B>B2=y, C=foo}, {B>B1=z, C=foo}, {B>B1=x, B>B2=y, C=bar}, {B>B1=z, C=bar}] + totalCard := 1 + for _, part := range allUnflat { + if len(part) > 1 { + totalCard *= len(part) + } + } + ret := make([]config.GenericMap, totalCard) + indexes := make([]int, len(allUnflat)) + for c := range ret { + ret[c] = config.GenericMap{} + incIndex := false + for i, part := range allUnflat { + index := indexes[i] + for k, v := range part[index] { + ret[c][k] = v + } + if !incIndex { + if index+1 == len(part) { + indexes[i] = 0 + } else { + indexes[i] = index + 1 + incIndex = true + } + } + } + } + return ret +} + +func flattenNested(prefix string, nested config.GenericMap) config.GenericMap { + subFlow := config.GenericMap{} + for k, v := range nested { + subFlow[prefix+k] = v + } + return subFlow +} diff --git a/pkg/pipeline/encode/encode_prom_metric.go b/pkg/pipeline/encode/metrics/preprocess.go similarity index 71% rename from pkg/pipeline/encode/encode_prom_metric.go rename to pkg/pipeline/encode/metrics/preprocess.go index 2e37dd11d..9043f9424 100644 --- a/pkg/pipeline/encode/encode_prom_metric.go +++ b/pkg/pipeline/encode/metrics/preprocess.go @@ -1,4 +1,4 @@ -package encode +package metrics import ( "fmt" @@ -7,16 +7,18 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" ) -type Predicate func(flow config.GenericMap) bool +type Predicate func(config.GenericMap) bool var variableExtractor = regexp.MustCompile(`\$\(([^\)]+)\)`) -type MetricInfo struct { +type Preprocessed struct { *api.MetricsItem - FilterPredicates []Predicate - MappedLabels []MappedLabel + filters []preprocessedFilter + MappedLabels []MappedLabel + FlattenedLabels []MappedLabel } type MappedLabel struct { @@ -24,9 +26,17 @@ type MappedLabel struct { Target string } -func (m *MetricInfo) TargetLabels() []string { +type preprocessedFilter struct { + predicate Predicate + useFlat bool +} + +func (p *Preprocessed) TargetLabels() []string { var targetLabels []string - for _, l := range m.MappedLabels { + for _, l := range p.FlattenedLabels { + targetLabels = append(targetLabels, l.Target) + } + for _, l := range p.MappedLabels { targetLabels = append(targetLabels, l.Target) } return targetLabels @@ -40,10 +50,8 @@ func Presence(filter api.MetricsFilter) Predicate { } func Absence(filter api.MetricsFilter) Predicate { - return func(flow config.GenericMap) bool { - _, found := flow[filter.Key] - return !found - } + pred := Presence(filter) + return func(flow config.GenericMap) bool { return !pred(flow) } } func Equal(filter api.MetricsFilter) Predicate { @@ -124,7 +132,7 @@ func injectVars(flow config.GenericMap, filterValue string, varLookups [][]strin if sVal, ok := rawVal.(string); ok { value = sVal } else { - value = fmt.Sprint(rawVal) + value = utils.ConvertToString(rawVal) } } injected = strings.ReplaceAll(injected, matchGroup[0], value) @@ -132,8 +140,8 @@ func injectVars(flow config.GenericMap, filterValue string, varLookups [][]strin return injected } -func CreateMetricInfo(def *api.MetricsItem) *MetricInfo { - mi := MetricInfo{ +func Preprocess(def *api.MetricsItem) *Preprocessed { + mi := Preprocessed{ MetricsItem: def, } for _, l := range def.Labels { @@ -141,10 +149,26 @@ func CreateMetricInfo(def *api.MetricsItem) *MetricInfo { if as := def.Remap[l]; as != "" { ml.Target = as } - mi.MappedLabels = append(mi.MappedLabels, ml) + if mi.isFlattened(l) { + mi.FlattenedLabels = append(mi.FlattenedLabels, ml) + } else { + mi.MappedLabels = append(mi.MappedLabels, ml) + } } for _, f := range def.Filters { - mi.FilterPredicates = append(mi.FilterPredicates, filterToPredicate(f)) + mi.filters = append(mi.filters, preprocessedFilter{ + predicate: filterToPredicate(f), + useFlat: mi.isFlattened(f.Key), + }) } return &mi } + +func (p *Preprocessed) isFlattened(fieldPath string) bool { + for _, flat := range p.Flatten { + if fieldPath == flat || strings.HasPrefix(fieldPath, flat+">") { + return true + } + } + return false +} diff --git a/pkg/pipeline/encode/metrics/preprocess_test.go b/pkg/pipeline/encode/metrics/preprocess_test.go new file mode 100644 index 000000000..93a3ac089 --- /dev/null +++ b/pkg/pipeline/encode/metrics/preprocess_test.go @@ -0,0 +1,53 @@ +package metrics + +import ( + "testing" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/stretchr/testify/assert" +) + +func Test_Filters_extractVarLookups(t *testing.T) { + variables := extractVarLookups("$(abc)--$(def)") + + assert.Equal(t, [][]string{{"$(abc)", "abc"}, {"$(def)", "def"}}, variables) + + variables = extractVarLookups("") + assert.Empty(t, variables) +} + +func Test_Flatten(t *testing.T) { + pp := Preprocess(&api.MetricsItem{Flatten: []string{"interfaces", "events"}}) + fl := pp.GenerateFlatParts(config.GenericMap{ + "namespace": "A", + "interfaces": []string{"eth0", "123456"}, + "events": []any{ + config.GenericMap{"type": "egress", "name": "my_egress"}, + config.GenericMap{"type": "acl", "name": "my_policy"}, + }, + "bytes": 7, + }) + assert.Equal(t, []config.GenericMap{ + { + "interfaces": "eth0", + "events>type": "egress", + "events>name": "my_egress", + }, + { + "interfaces": "123456", + "events>type": "egress", + "events>name": "my_egress", + }, + { + "interfaces": "eth0", + "events>type": "acl", + "events>name": "my_policy", + }, + { + "interfaces": "123456", + "events>type": "acl", + "events>name": "my_policy", + }, + }, fl) +} diff --git a/pkg/pipeline/encode/metrics_common.go b/pkg/pipeline/encode/metrics_common.go index f87e9a35c..d420f1af8 100644 --- a/pkg/pipeline/encode/metrics_common.go +++ b/pkg/pipeline/encode/metrics_common.go @@ -24,6 +24,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode/metrics" putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" "github.com/netobserv/flowlogs-pipeline/pkg/utils" "github.com/prometheus/client_golang/prometheus" @@ -32,7 +33,7 @@ import ( type mInfoStruct struct { genericMetric interface{} // can be a counter, gauge, or histogram pointer - info *MetricInfo + info *metrics.Preprocessed } type MetricsCommonStruct struct { @@ -84,22 +85,22 @@ var ( ) ) -func (m *MetricsCommonStruct) AddCounter(name string, g interface{}, info *MetricInfo) { +func (m *MetricsCommonStruct) AddCounter(name string, g interface{}, info *metrics.Preprocessed) { mStruct := mInfoStruct{genericMetric: g, info: info} m.counters[name] = mStruct } -func (m *MetricsCommonStruct) AddGauge(name string, g interface{}, info *MetricInfo) { +func (m *MetricsCommonStruct) AddGauge(name string, g interface{}, info *metrics.Preprocessed) { mStruct := mInfoStruct{genericMetric: g, info: info} m.gauges[name] = mStruct } -func (m *MetricsCommonStruct) AddHist(name string, g interface{}, info *MetricInfo) { +func (m *MetricsCommonStruct) AddHist(name string, g interface{}, info *metrics.Preprocessed) { mStruct := mInfoStruct{genericMetric: g, info: info} m.histos[name] = mStruct } -func (m *MetricsCommonStruct) AddAggHist(name string, g interface{}, info *MetricInfo) { +func (m *MetricsCommonStruct) AddAggHist(name string, g interface{}, info *metrics.Preprocessed) { mStruct := mInfoStruct{genericMetric: g, info: info} m.aggHistos[name] = mStruct } @@ -109,91 +110,116 @@ func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, met // Process counters for _, mInfo := range m.counters { - labels, value, _ := m.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric) - if labels == nil { + labelSets, value := m.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric) + if labelSets == nil { continue } - err := mci.ProcessCounter(mInfo.genericMetric, labels, value) - if err != nil { - log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() - continue + for _, labels := range labelSets { + err := mci.ProcessCounter(mInfo.genericMetric, labels.lMap, value) + if err != nil { + log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) + m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + continue + } + m.metricsProcessed.Inc() } - m.metricsProcessed.Inc() } // Process gauges for _, mInfo := range m.gauges { - labels, value, key := m.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric) - if labels == nil { + labelSets, value := m.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric) + if labelSets == nil { continue } - err := mci.ProcessGauge(mInfo.genericMetric, labels, value, key) - if err != nil { - log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() - continue + for _, labels := range labelSets { + err := mci.ProcessGauge(mInfo.genericMetric, labels.lMap, value, labels.key) + if err != nil { + log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) + m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + continue + } + m.metricsProcessed.Inc() } - m.metricsProcessed.Inc() } // Process histograms for _, mInfo := range m.histos { - labels, value, _ := m.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric) - if labels == nil { + labelSets, value := m.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric) + if labelSets == nil { continue } - err := mci.ProcessHist(mInfo.genericMetric, labels, value) - if err != nil { - log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() - continue + for _, labels := range labelSets { + err := mci.ProcessHist(mInfo.genericMetric, labels.lMap, value) + if err != nil { + log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) + m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + continue + } + m.metricsProcessed.Inc() } - m.metricsProcessed.Inc() } // Process pre-aggregated histograms for _, mInfo := range m.aggHistos { - labels, values := m.prepareAggHisto(mci, metricRecord, mInfo.info, mInfo.genericMetric) - if labels == nil { + labelSets, values := m.prepareAggHisto(mci, metricRecord, mInfo.info, mInfo.genericMetric) + if labelSets == nil { continue } - err := mci.ProcessAggHist(mInfo.genericMetric, labels, values) - if err != nil { - log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() - continue + for _, labels := range labelSets { + err := mci.ProcessAggHist(mInfo.genericMetric, labels.lMap, values) + if err != nil { + log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) + m.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + continue + } + m.metricsProcessed.Inc() } - m.metricsProcessed.Inc() } } -func (m *MetricsCommonStruct) prepareMetric(mci MetricsCommonInterface, flow config.GenericMap, info *MetricInfo, mv interface{}) (map[string]string, float64, string) { +func (m *MetricsCommonStruct) prepareMetric(mci MetricsCommonInterface, flow config.GenericMap, info *metrics.Preprocessed, mv interface{}) ([]labelsKeyAndMap, float64) { + flatParts := info.GenerateFlatParts(flow) + ok, flatParts := info.ApplyFilters(flow, flatParts) + if !ok { + return nil, 0 + } + val := m.extractGenericValue(flow, info) if val == nil { - return nil, 0, "" + return nil, 0 } floatVal, err := utils.ConvertToFloat64(val) if err != nil { m.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc() - return nil, 0, "" + return nil, 0 } if info.ValueScale != 0 { floatVal /= info.ValueScale } - entryLabels, key := extractLabelsAndKey(flow, info) - // Update entry for expiry mechanism (the entry itself is its own cleanup function) - cacheEntry := mci.GetChacheEntry(entryLabels, mv) - ok := m.mCache.UpdateCacheEntry(key, cacheEntry) - if !ok { - m.metricsDropped.Inc() - return nil, 0, "" + labelSets := extractLabels(flow, flatParts, info) + var lkms []labelsKeyAndMap + for _, ls := range labelSets { + // Update entry for expiry mechanism (the entry itself is its own cleanup function) + lkm := ls.toKeyAndMap(info) + lkms = append(lkms, lkm) + cacheEntry := mci.GetChacheEntry(lkm.lMap, mv) + ok := m.mCache.UpdateCacheEntry(lkm.key, cacheEntry) + if !ok { + m.metricsDropped.Inc() + return nil, 0 + } } - return entryLabels, floatVal, key + return lkms, floatVal } -func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow config.GenericMap, info *MetricInfo, mc interface{}) (map[string]string, []float64) { +func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow config.GenericMap, info *metrics.Preprocessed, mc interface{}) ([]labelsKeyAndMap, []float64) { + flatParts := info.GenerateFlatParts(flow) + ok, flatParts := info.ApplyFilters(flow, flatParts) + if !ok { + return nil, nil + } + val := m.extractGenericValue(flow, info) if val == nil { return nil, nil @@ -204,23 +230,23 @@ func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow c return nil, nil } - entryLabels, key := extractLabelsAndKey(flow, info) - // Update entry for expiry mechanism (the entry itself is its own cleanup function) - cacheEntry := mci.GetChacheEntry(entryLabels, mc) - ok = m.mCache.UpdateCacheEntry(key, cacheEntry) - if !ok { - m.metricsDropped.Inc() - return nil, nil + labelSets := extractLabels(flow, flatParts, info) + var lkms []labelsKeyAndMap + for _, ls := range labelSets { + // Update entry for expiry mechanism (the entry itself is its own cleanup function) + lkm := ls.toKeyAndMap(info) + lkms = append(lkms, lkm) + cacheEntry := mci.GetChacheEntry(lkm.lMap, mc) + ok := m.mCache.UpdateCacheEntry(lkm.key, cacheEntry) + if !ok { + m.metricsDropped.Inc() + return nil, nil + } } - return entryLabels, values + return lkms, values } -func (m *MetricsCommonStruct) extractGenericValue(flow config.GenericMap, info *MetricInfo) interface{} { - for _, pred := range info.FilterPredicates { - if !pred(flow) { - return nil - } - } +func (m *MetricsCommonStruct) extractGenericValue(flow config.GenericMap, info *metrics.Preprocessed) interface{} { if info.ValueKey == "" { // No value key means it's a records / flows counter (1 flow = 1 increment), so just return 1 return 1 @@ -233,21 +259,58 @@ func (m *MetricsCommonStruct) extractGenericValue(flow config.GenericMap, info * return val } -func extractLabelsAndKey(flow config.GenericMap, info *MetricInfo) (map[string]string, string) { - entryLabels := make(map[string]string, len(info.MappedLabels)) +type label struct { + key string + value string +} + +type labelSet []label + +type labelsKeyAndMap struct { + key string + lMap map[string]string +} + +func (l labelSet) toKeyAndMap(info *metrics.Preprocessed) labelsKeyAndMap { key := strings.Builder{} key.WriteString(info.Name) key.WriteRune('|') - for _, t := range info.MappedLabels { - value := "" - if v, ok := flow[t.Source]; ok { - value = utils.ConvertToString(v) - } - entryLabels[t.Target] = value - key.WriteString(value) + m := map[string]string{} + for _, kv := range l { + key.WriteString(kv.value) key.WriteRune('|') + m[kv.key] = kv.value + } + return labelsKeyAndMap{key: key.String(), lMap: m} +} + +// extractLabels takes the flow and a single metric definition as input. +// It returns the flat labels maps (label names and values). +// Most of the time it will return a single map; it may return several of them when the parsed flow fields are lists (e.g. "interfaces"). +func extractLabels(flow config.GenericMap, flatParts []config.GenericMap, info *metrics.Preprocessed) []labelSet { + common := newLabelSet(flow, info.MappedLabels) + if len(flatParts) == 0 { + return []labelSet{common} + } + var all []labelSet + for _, fp := range flatParts { + ls := newLabelSet(fp, info.FlattenedLabels) + ls = append(ls, common...) + all = append(all, ls) + } + return all +} + +func newLabelSet(part config.GenericMap, labels []metrics.MappedLabel) labelSet { + var ls labelSet + for _, t := range labels { + label := label{key: t.Target, value: ""} + if v, ok := part[t.Source]; ok { + label.value = utils.ConvertToString(v) + } + ls = append(ls, label) } - return entryLabels, key.String() + return ls } func (m *MetricsCommonStruct) cleanupExpiredEntriesLoop(callback putils.CacheCallback) { diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go index 12edd1d98..92200cece 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go @@ -25,6 +25,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode/metrics" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -135,7 +136,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar fullMetricName := cfg.Prefix + mCfg.Name log.Debugf("fullMetricName = %v", fullMetricName) log.Debugf("Labels = %v", mCfg.Labels) - mInfo := encode.CreateMetricInfo(mCfg) + mInfo := metrics.Preprocess(mCfg) switch mCfg.Type { case api.MetricCounter: counter, err := meter.Float64Counter(fullMetricName) diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 308fa2d01..22aa4292e 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -71,7 +71,7 @@ func (aggregate *Aggregate) LabelsFromEntry(entry config.GenericMap) (Labels, bo if !ok { allLabelsFound = false } - labels[key] = fmt.Sprint(value) + labels[key] = util.ConvertToString(value) } return labels, allLabelsFound diff --git a/pkg/pipeline/ingest/ingest_grpc.go b/pkg/pipeline/ingest/ingest_grpc.go index db9750b02..16afb8d76 100644 --- a/pkg/pipeline/ingest/ingest_grpc.go +++ b/pkg/pipeline/ingest/ingest_grpc.go @@ -7,7 +7,8 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" "github.com/netobserv/netobserv-ebpf-agent/pkg/decode" grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" @@ -60,7 +61,7 @@ func NewGRPCProtobuf(opMetrics *operational.Metrics, params config.StageParam) ( func (no *GRPCProtobuf) Ingest(out chan<- config.GenericMap) { no.metrics.createOutQueueLen(out) go func() { - <-utils.ExitChannel() + <-pUtils.ExitChannel() close(no.flowPackets) no.collector.Close() }() @@ -108,7 +109,7 @@ func instrumentGRPC(m *metrics) grpc2.UnaryServerInterceptor { if err != nil { // "trace" level used to minimize performance impact glog.Tracef("Reporting metric error: %v", err) - m.error(fmt.Sprint(status.Code(err))) + m.error(utils.ConvertToString(status.Code(err))) } // Stage duration diff --git a/pkg/pipeline/write/write_loki.go b/pkg/pipeline/write/write_loki.go index 865c0d043..ceea8450d 100644 --- a/pkg/pipeline/write/write_loki.go +++ b/pkg/pipeline/write/write_loki.go @@ -27,6 +27,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" logAdapter "github.com/go-kit/kit/log/logrus" jsonIter "github.com/json-iterator/go" @@ -173,7 +174,7 @@ func (l *Loki) addLabels(record config.GenericMap, labels model.LabelSet) { if !ok { continue } - lv := model.LabelValue(fmt.Sprint(val)) + lv := model.LabelValue(utils.ConvertToString(val)) if !lv.IsValid() { log.WithFields(logrus.Fields{"key": label, "value": val}). Debug("Invalid label value. Ignoring it")