diff --git a/pkg/api/api.go b/pkg/api/api.go index 7b441d447..c71697c12 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -46,6 +46,10 @@ const ( AddServiceRuleType = "add_service" AddKubernetesRuleType = "add_kubernetes" ReinterpretDirectionRuleType = "reinterpret_direction" + PromFilterExact = "exact" + PromFilterPresence = "presence" + PromFilterAbsence = "absence" + PromFilterRegex = "regex" TagYaml = "yaml" TagDoc = "doc" diff --git a/pkg/api/encode_prom.go b/pkg/api/encode_prom.go index 77d189bb9..c557ec031 100644 --- a/pkg/api/encode_prom.go +++ b/pkg/api/encode_prom.go @@ -41,13 +41,14 @@ func PromEncodeOperationName(operation string) string { } type PromMetricsItem struct { - Name string `yaml:"name" json:"name" doc:"the metric name"` - Type string `yaml:"type" json:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"` - Filter PromMetricsFilter `yaml:"filter,omitempty" json:"filter,omitempty" doc:"an optional criterion to filter entries by. Deprecated: use filters instead."` - Filters []PromMetricsFilter `yaml:"filters" json:"filters" doc:"a list of criteria to filter entries by"` - ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"` - Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"` - Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"` + Name string `yaml:"name" json:"name" doc:"the metric name"` + Type string `yaml:"type" json:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"` + Filter PromMetricsFilter `yaml:"filter,omitempty" json:"filter,omitempty" doc:"an optional criterion to filter entries by. Deprecated: use filters instead."` + Filters []PromMetricsFilter `yaml:"filters" json:"filters" doc:"a list of criteria to filter entries by"` + ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"` + Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"` + Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"` + ValueScale float64 `yaml:"valueScale" json:"valueScale" doc:"scale factor of the value (MetricVal := FlowVal / Scale)"` } func (i *PromMetricsItem) GetFilters() []PromMetricsFilter { @@ -61,5 +62,17 @@ type PromMetricsItems []PromMetricsItem type PromMetricsFilter struct { Key string `yaml:"key" json:"key" doc:"the key to match and filter by"` - Value string `yaml:"value" json:"value" doc:"the value to match and filter by. Use !nil / nil to match presence / absence. Add multiple matching values using '|' rune such as 'a|b' to match either 'a' or 'b'."` + Value string `yaml:"value" json:"value" doc:"the value to match and filter by"` + Type string `yaml:"type" json:"type" enum:"PromEncodeFilterTypeEnum" doc:"the type of filter match: exact (default), presence, absence or regex"` +} + +type PromEncodeFilterTypeEnum struct { + Exact string `yaml:"exact" json:"exact" doc:"match exactly the provided fitler value"` + Presence string `yaml:"presence" json:"presence" doc:"filter key must be present (filter value is ignored)"` + Absence string `yaml:"absence" json:"absence" doc:"filter key must be absent (filter value is ignored)"` + Regex string `yaml:"regex" json:"regex" doc:"match filter value as a regular expression"` +} + +func PromEncodeFilterTypeName(t string) string { + return GetEnumName(PromEncodeFilterTypeEnum{}, t) } diff --git a/pkg/api/enum.go b/pkg/api/enum.go index 93b54893f..4b8deab0b 100644 --- a/pkg/api/enum.go +++ b/pkg/api/enum.go @@ -24,6 +24,7 @@ import ( type enums struct { PromEncodeOperationEnum PromEncodeOperationEnum + PromEncodeFilterTypeEnum PromEncodeFilterTypeEnum TransformNetworkOperationEnum TransformNetworkOperationEnum TransformFilterOperationEnum TransformFilterOperationEnum TransformGenericOperationEnum TransformGenericOperationEnum diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index 36bc105e6..aaa368418 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -166,7 +166,7 @@ func TestKafkaPromPipeline(t *testing.T) { b, err = json.Marshal(params[4]) require.NoError(t, err) - require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"","value":""},"filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b)) + require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"","type":"","value":""},"filters":[{"key":"name","type":"","value":"src_as_connection_count"}],"valueKey":"recent_count","valueScale":0,"labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b)) } func TestForkPipeline(t *testing.T) { diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 22e9e890d..f13cf093c 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -27,7 +27,6 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/operational" putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" "github.com/netobserv/flowlogs-pipeline/pkg/utils" - "k8s.io/utils/strings/slices" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -37,17 +36,17 @@ const defaultExpiryTime = time.Duration(2 * time.Minute) type gaugeInfo struct { gauge *prometheus.GaugeVec - info api.PromMetricsItem + info *metricInfo } type counterInfo struct { counter *prometheus.CounterVec - info api.PromMetricsItem + info *metricInfo } type histoInfo struct { histo *prometheus.HistogramVec - info api.PromMetricsItem + info *metricInfo } type EncodeProm struct { @@ -97,7 +96,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) { // Process counters for _, mInfo := range e.counters { - labels, value := e.prepareMetric(metricRecord, &mInfo.info, mInfo.counter.MetricVec) + labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.counter.MetricVec) if labels == nil { continue } @@ -113,7 +112,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) { // Process gauges for _, mInfo := range e.gauges { - labels, value := e.prepareMetric(metricRecord, &mInfo.info, mInfo.gauge.MetricVec) + labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.gauge.MetricVec) if labels == nil { continue } @@ -129,7 +128,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) { // Process histograms for _, mInfo := range e.histos { - labels, value := e.prepareMetric(metricRecord, &mInfo.info, mInfo.histo.MetricVec) + labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.histo.MetricVec) if labels == nil { continue } @@ -145,7 +144,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) { // Process pre-aggregated histograms for _, mInfo := range e.aggHistos { - labels, values := e.prepareAggHisto(metricRecord, &mInfo.info, mInfo.histo.MetricVec) + labels, values := e.prepareAggHisto(metricRecord, mInfo.info, mInfo.histo.MetricVec) if labels == nil { continue } @@ -162,7 +161,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) { } } -func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.PromMetricsItem, m *prometheus.MetricVec) (map[string]string, float64) { +func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *metricInfo, m *prometheus.MetricVec) (map[string]string, float64) { val := e.extractGenericValue(flow, info) if val == nil { return nil, 0 @@ -172,8 +171,11 @@ func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.PromMetrics e.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc() return nil, 0 } + if info.ValueScale != 0 { + floatVal = floatVal / info.ValueScale + } - entryLabels, key := e.extractLabelsAndKey(flow, info) + entryLabels, key := e.extractLabelsAndKey(flow, &info.PromMetricsItem) // Update entry for expiry mechanism (the entry itself is its own cleanup function) _, ok := e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) }) if !ok { @@ -183,7 +185,7 @@ func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.PromMetrics return entryLabels, floatVal } -func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetricsItem, m *prometheus.MetricVec) (map[string]string, []float64) { +func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *metricInfo, m *prometheus.MetricVec) (map[string]string, []float64) { val := e.extractGenericValue(flow, info) if val == nil { return nil, nil @@ -194,7 +196,7 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetri return nil, nil } - entryLabels, key := e.extractLabelsAndKey(flow, info) + entryLabels, key := e.extractLabelsAndKey(flow, &info.PromMetricsItem) // Update entry for expiry mechanism (the entry itself is its own cleanup function) _, ok = e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) }) if !ok { @@ -204,28 +206,10 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetri return entryLabels, values } -func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *api.PromMetricsItem) interface{} { - for _, filter := range info.GetFilters() { - val, found := flow[filter.Key] - switch filter.Value { - case "nil": - if found { - return nil - } - case "!nil": - if !found { - return nil - } - default: - if found { - sVal, ok := val.(string) - if !ok { - sVal = fmt.Sprint(val) - } - if !slices.Contains(strings.Split(filter.Value, "|"), sVal) { - return nil - } - } +func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *metricInfo) interface{} { + for _, pred := range info.filterPredicates { + if !pred(flow) { + return nil } } if info.ValueKey == "" { @@ -291,12 +275,13 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En histos := []histoInfo{} aggHistos := []histoInfo{} - for _, mInfo := range cfg.Metrics { - fullMetricName := cfg.Prefix + mInfo.Name - labels := mInfo.Labels + for _, mCfg := range cfg.Metrics { + fullMetricName := cfg.Prefix + mCfg.Name + labels := mCfg.Labels log.Debugf("fullMetricName = %v", fullMetricName) log.Debugf("Labels = %v", labels) - switch mInfo.Type { + mInfo := CreateMetricInfo(mCfg) + switch mCfg.Type { case api.PromEncodeOperationName("Counter"): counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, labels) err := prometheus.Register(counter) @@ -320,8 +305,8 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En info: mInfo, }) case api.PromEncodeOperationName("Histogram"): - log.Debugf("buckets = %v", mInfo.Buckets) - hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mInfo.Buckets}, labels) + log.Debugf("buckets = %v", mCfg.Buckets) + hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) err := prometheus.Register(hist) if err != nil { log.Errorf("error during prometheus.Register: %v", err) @@ -332,8 +317,8 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En info: mInfo, }) case api.PromEncodeOperationName("AggHistogram"): - log.Debugf("buckets = %v", mInfo.Buckets) - hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mInfo.Buckets}, labels) + log.Debugf("buckets = %v", mCfg.Buckets) + hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) err := prometheus.Register(hist) if err != nil { log.Errorf("error during prometheus.Register: %v", err) @@ -344,7 +329,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En info: mInfo, }) case "default": - log.Errorf("invalid metric type = %v, skipping", mInfo.Type) + log.Errorf("invalid metric type = %v, skipping", mCfg.Type) continue } } diff --git a/pkg/pipeline/encode/encode_prom_metric.go b/pkg/pipeline/encode/encode_prom_metric.go new file mode 100644 index 000000000..e877dc9a1 --- /dev/null +++ b/pkg/pipeline/encode/encode_prom_metric.go @@ -0,0 +1,82 @@ +package encode + +import ( + "fmt" + "regexp" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" +) + +type predicate func(flow config.GenericMap) bool + +type metricInfo struct { + api.PromMetricsItem + filterPredicates []predicate +} + +func presence(filter api.PromMetricsFilter) predicate { + return func(flow config.GenericMap) bool { + _, found := flow[filter.Key] + return found + } +} + +func absence(filter api.PromMetricsFilter) predicate { + return func(flow config.GenericMap) bool { + _, found := flow[filter.Key] + return !found + } +} + +func exact(filter api.PromMetricsFilter) predicate { + return func(flow config.GenericMap) bool { + if val, found := flow[filter.Key]; found { + sVal, ok := val.(string) + if !ok { + sVal = fmt.Sprint(val) + } + return sVal == filter.Value + } + return false + } +} + +func regex(filter api.PromMetricsFilter) predicate { + r, _ := regexp.Compile(filter.Value) + return func(flow config.GenericMap) bool { + if val, found := flow[filter.Key]; found { + sVal, ok := val.(string) + if !ok { + sVal = fmt.Sprint(val) + } + return r.MatchString(sVal) + } + return false + } +} + +func filterToPredicate(filter api.PromMetricsFilter) predicate { + switch filter.Type { + case api.PromFilterExact: + return exact(filter) + case api.PromFilterPresence: + return presence(filter) + case api.PromFilterAbsence: + return absence(filter) + case api.PromFilterRegex: + return regex(filter) + } + // Default = exact + return exact(filter) +} + +func CreateMetricInfo(def api.PromMetricsItem) *metricInfo { + mi := metricInfo{ + PromMetricsItem: def, + } + for _, f := range def.GetFilters() { + mi.filterPredicates = append(mi.filterPredicates, filterToPredicate(f)) + } + return &mi +} diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 0555991f5..828d5600e 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -306,7 +306,7 @@ func Test_FilterNotNil(t *testing.T) { Name: "latencies", Type: "histogram", ValueKey: "latency", - Filters: []api.PromMetricsFilter{{Key: "latency", Value: "!nil"}}, + Filters: []api.PromMetricsFilter{{Key: "latency", Type: "presence"}}, }, }, } @@ -361,7 +361,7 @@ func Test_FilterDirection(t *testing.T) { Name: "ingress_or_inner_packets_total", Type: "counter", ValueKey: "packets", - Filters: []api.PromMetricsFilter{{Key: "dir", Value: "0|2"}}, + Filters: []api.PromMetricsFilter{{Key: "dir", Value: "0|2", Type: "regex"}}, }, }, } @@ -380,6 +380,45 @@ func Test_FilterDirection(t *testing.T) { require.Contains(t, exposed, `test_ingress_or_inner_packets_total 1010`) } +func Test_ValueScale(t *testing.T) { + metrics := []config.GenericMap{{"rtt": 15_000_000} /*15ms*/, {"rtt": 110_000_000} /*110ms*/} + params := api.PromEncode{ + Prefix: "test_", + ExpiryTime: api.Duration{Duration: time.Duration(60 * time.Second)}, + Metrics: []api.PromMetricsItem{ + { + Name: "rtt_seconds", + Type: "histogram", + ValueKey: "rtt", + ValueScale: 1_000_000_000, + }, + }, + } + + encodeProm, err := initProm(¶ms) + require.NoError(t, err) + for _, metric := range metrics { + encodeProm.Encode(metric) + } + time.Sleep(100 * time.Millisecond) + + exposed := test.ReadExposedMetrics(t) + + // One is less than 25ms, Two are less than 250ms + require.Contains(t, exposed, `test_rtt_seconds_bucket{le="0.005"} 0`) + require.Contains(t, exposed, `test_rtt_seconds_bucket{le="0.01"} 0`) + require.Contains(t, exposed, `test_rtt_seconds_bucket{le="0.025"} 1`) + require.Contains(t, exposed, `test_rtt_seconds_bucket{le="0.05"} 1`) + require.Contains(t, exposed, `test_rtt_seconds_bucket{le="0.1"} 1`) + require.Contains(t, exposed, `test_rtt_seconds_bucket{le="0.25"} 2`) + require.Contains(t, exposed, `test_rtt_seconds_bucket{le="0.5"} 2`) + require.Contains(t, exposed, `test_rtt_seconds_bucket{le="1"} 2`) + require.Contains(t, exposed, `test_rtt_seconds_bucket{le="2.5"} 2`) + require.Contains(t, exposed, `test_rtt_seconds_bucket{le="5"} 2`) + require.Contains(t, exposed, `test_rtt_seconds_bucket{le="10"} 2`) + require.Contains(t, exposed, `test_rtt_seconds_bucket{le="+Inf"} 2`) +} + func Test_MetricTTL(t *testing.T) { metrics := []config.GenericMap{{ "srcIP": "20.0.0.2", @@ -485,8 +524,8 @@ func buildFlow() config.GenericMap { } func hundredFlows() []config.GenericMap { - flows := make([]config.GenericMap, 100) - for i := 0; i < 100; i++ { + flows := make([]config.GenericMap, 1000) + for i := 0; i < 1000; i++ { flows[i] = buildFlow() } return flows @@ -503,11 +542,24 @@ func BenchmarkPromEncode(b *testing.B) { Type: "counter", ValueKey: "bytes", Labels: []string{"srcIP", "dstIP"}, + Filters: []api.PromMetricsFilter{ + { + Key: "srcIP", + Value: "10.0.0.10|10.0.0.11|10.0.0.12", + Type: "regex", + }, + }, }, { Name: "packets_total", Type: "counter", ValueKey: "packets", Labels: []string{"srcIP", "dstIP"}, + Filters: []api.PromMetricsFilter{ + { + Key: "srcIP", + Value: "10.0.0.10", + }, + }, }, { Name: "latency_seconds", Type: "histogram",