Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1231: Improve promencode API: scaling & filters #513

Merged
merged 2 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,24 @@ Following is the supported API format for prometheus encode:
agg_histogram: counts samples in configurable buckets, pre-aggregated via an Aggregate stage
filter: an optional criterion to filter entries by. Deprecated: use filters instead.
key: the key to match and filter by
value: the value to match and filter by. Use !nil / nil to match presence / absence. Add multiple matching values using '|' rune such as 'a|b' to match either 'a' or 'b'.
value: the value to match and filter by
type: (enum) the type of filter match: exact (default), presence, absence or regex
exact: match exactly the provided fitler value
presence: filter key must be present (filter value is ignored)
absence: filter key must be absent (filter value is ignored)
regex: match filter value as a regular expression
filters: a list of criteria to filter entries by
key: the key to match and filter by
value: the value to match and filter by. Use !nil / nil to match presence / absence. Add multiple matching values using '|' rune such as 'a|b' to match either 'a' or 'b'.
value: the value to match and filter by
type: (enum) the type of filter match: exact (default), presence, absence or regex
exact: match exactly the provided fitler value
presence: filter key must be present (filter value is ignored)
absence: filter key must be absent (filter value is ignored)
regex: match filter value as a regular expression
valueKey: entry key from which to resolve metric value
labels: labels to be associated with the metric
buckets: histogram buckets
valueScale: scale factor of the value (MetricVal := FlowVal / Scale)
prefix: prefix added to each metric name
expiryTime: time duration of no-flow to wait before deleting prometheus data item
maxMetrics: maximum number of metrics to report (default: unlimited)
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const (
AddServiceRuleType = "add_service"
AddKubernetesRuleType = "add_kubernetes"
ReinterpretDirectionRuleType = "reinterpret_direction"
PromFilterExact = "exact"
PromFilterPresence = "presence"
PromFilterAbsence = "absence"
PromFilterRegex = "regex"

TagYaml = "yaml"
TagDoc = "doc"
Expand Down
29 changes: 21 additions & 8 deletions pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ func PromEncodeOperationName(operation string) string {
}

type PromMetricsItem struct {
Name string `yaml:"name" json:"name" doc:"the metric name"`
Type string `yaml:"type" json:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"`
Filter PromMetricsFilter `yaml:"filter,omitempty" json:"filter,omitempty" doc:"an optional criterion to filter entries by. Deprecated: use filters instead."`
Filters []PromMetricsFilter `yaml:"filters" json:"filters" doc:"a list of criteria to filter entries by"`
ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"`
Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"`
Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"`
Name string `yaml:"name" json:"name" doc:"the metric name"`
Type string `yaml:"type" json:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"`
Filter PromMetricsFilter `yaml:"filter,omitempty" json:"filter,omitempty" doc:"an optional criterion to filter entries by. Deprecated: use filters instead."`
Filters []PromMetricsFilter `yaml:"filters" json:"filters" doc:"a list of criteria to filter entries by"`
ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"`
Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"`
Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"`
ValueScale float64 `yaml:"valueScale" json:"valueScale" doc:"scale factor of the value (MetricVal := FlowVal / Scale)"`
}

func (i *PromMetricsItem) GetFilters() []PromMetricsFilter {
Expand All @@ -61,5 +62,17 @@ type PromMetricsItems []PromMetricsItem

type PromMetricsFilter struct {
Key string `yaml:"key" json:"key" doc:"the key to match and filter by"`
Value string `yaml:"value" json:"value" doc:"the value to match and filter by. Use !nil / nil to match presence / absence. Add multiple matching values using '|' rune such as 'a|b' to match either 'a' or 'b'."`
Value string `yaml:"value" json:"value" doc:"the value to match and filter by"`
Type string `yaml:"type" json:"type" enum:"PromEncodeFilterTypeEnum" doc:"the type of filter match: exact (default), presence, absence or regex"`
}

type PromEncodeFilterTypeEnum struct {
Exact string `yaml:"exact" json:"exact" doc:"match exactly the provided fitler value"`
Presence string `yaml:"presence" json:"presence" doc:"filter key must be present (filter value is ignored)"`
Absence string `yaml:"absence" json:"absence" doc:"filter key must be absent (filter value is ignored)"`
Regex string `yaml:"regex" json:"regex" doc:"match filter value as a regular expression"`
}

func PromEncodeFilterTypeName(t string) string {
return GetEnumName(PromEncodeFilterTypeEnum{}, t)
}
1 change: 1 addition & 0 deletions pkg/api/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

type enums struct {
PromEncodeOperationEnum PromEncodeOperationEnum
PromEncodeFilterTypeEnum PromEncodeFilterTypeEnum
TransformNetworkOperationEnum TransformNetworkOperationEnum
TransformFilterOperationEnum TransformFilterOperationEnum
TransformGenericOperationEnum TransformGenericOperationEnum
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[4])
require.NoError(t, err)
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"","value":""},"filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b))
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"","type":"","value":""},"filters":[{"key":"name","type":"","value":"src_as_connection_count"}],"valueKey":"recent_count","valueScale":0,"labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b))
}

func TestForkPipeline(t *testing.T) {
Expand Down
71 changes: 28 additions & 43 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
"k8s.io/utils/strings/slices"

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
Expand All @@ -37,17 +36,17 @@ const defaultExpiryTime = time.Duration(2 * time.Minute)

type gaugeInfo struct {
gauge *prometheus.GaugeVec
info api.PromMetricsItem
info *metricInfo
}

type counterInfo struct {
counter *prometheus.CounterVec
info api.PromMetricsItem
info *metricInfo
}

type histoInfo struct {
histo *prometheus.HistogramVec
info api.PromMetricsItem
info *metricInfo
}

type EncodeProm struct {
Expand Down Expand Up @@ -97,7 +96,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) {

// Process counters
for _, mInfo := range e.counters {
labels, value := e.prepareMetric(metricRecord, &mInfo.info, mInfo.counter.MetricVec)
labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.counter.MetricVec)
if labels == nil {
continue
}
Expand All @@ -113,7 +112,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) {

// Process gauges
for _, mInfo := range e.gauges {
labels, value := e.prepareMetric(metricRecord, &mInfo.info, mInfo.gauge.MetricVec)
labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.gauge.MetricVec)
if labels == nil {
continue
}
Expand All @@ -129,7 +128,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) {

// Process histograms
for _, mInfo := range e.histos {
labels, value := e.prepareMetric(metricRecord, &mInfo.info, mInfo.histo.MetricVec)
labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.histo.MetricVec)
if labels == nil {
continue
}
Expand All @@ -145,7 +144,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) {

// Process pre-aggregated histograms
for _, mInfo := range e.aggHistos {
labels, values := e.prepareAggHisto(metricRecord, &mInfo.info, mInfo.histo.MetricVec)
labels, values := e.prepareAggHisto(metricRecord, mInfo.info, mInfo.histo.MetricVec)
if labels == nil {
continue
}
Expand All @@ -162,7 +161,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) {
}
}

func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.PromMetricsItem, m *prometheus.MetricVec) (map[string]string, float64) {
func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *metricInfo, m *prometheus.MetricVec) (map[string]string, float64) {
val := e.extractGenericValue(flow, info)
if val == nil {
return nil, 0
Expand All @@ -172,8 +171,11 @@ func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.PromMetrics
e.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc()
return nil, 0
}
if info.ValueScale != 0 {
floatVal = floatVal / info.ValueScale
}

entryLabels, key := e.extractLabelsAndKey(flow, info)
entryLabels, key := e.extractLabelsAndKey(flow, &info.PromMetricsItem)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok := e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
Expand All @@ -183,7 +185,7 @@ func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.PromMetrics
return entryLabels, floatVal
}

func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetricsItem, m *prometheus.MetricVec) (map[string]string, []float64) {
func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *metricInfo, m *prometheus.MetricVec) (map[string]string, []float64) {
val := e.extractGenericValue(flow, info)
if val == nil {
return nil, nil
Expand All @@ -194,7 +196,7 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetri
return nil, nil
}

entryLabels, key := e.extractLabelsAndKey(flow, info)
entryLabels, key := e.extractLabelsAndKey(flow, &info.PromMetricsItem)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok = e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
Expand All @@ -204,28 +206,10 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetri
return entryLabels, values
}

func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *api.PromMetricsItem) interface{} {
for _, filter := range info.GetFilters() {
val, found := flow[filter.Key]
switch filter.Value {
case "nil":
if found {
return nil
}
case "!nil":
if !found {
return nil
}
default:
if found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
if !slices.Contains(strings.Split(filter.Value, "|"), sVal) {
return nil
}
}
func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *metricInfo) interface{} {
for _, pred := range info.filterPredicates {
if !pred(flow) {
return nil
}
}
if info.ValueKey == "" {
Expand Down Expand Up @@ -291,12 +275,13 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
histos := []histoInfo{}
aggHistos := []histoInfo{}

for _, mInfo := range cfg.Metrics {
fullMetricName := cfg.Prefix + mInfo.Name
labels := mInfo.Labels
for _, mCfg := range cfg.Metrics {
fullMetricName := cfg.Prefix + mCfg.Name
labels := mCfg.Labels
log.Debugf("fullMetricName = %v", fullMetricName)
log.Debugf("Labels = %v", labels)
switch mInfo.Type {
mInfo := CreateMetricInfo(mCfg)
switch mCfg.Type {
case api.PromEncodeOperationName("Counter"):
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, labels)
err := prometheus.Register(counter)
Expand All @@ -320,8 +305,8 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
info: mInfo,
})
case api.PromEncodeOperationName("Histogram"):
log.Debugf("buckets = %v", mInfo.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mInfo.Buckets}, labels)
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
err := prometheus.Register(hist)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
Expand All @@ -332,8 +317,8 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
info: mInfo,
})
case api.PromEncodeOperationName("AggHistogram"):
log.Debugf("buckets = %v", mInfo.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mInfo.Buckets}, labels)
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
err := prometheus.Register(hist)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
Expand All @@ -344,7 +329,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
info: mInfo,
})
case "default":
log.Errorf("invalid metric type = %v, skipping", mInfo.Type)
log.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
}
}
Expand Down
82 changes: 82 additions & 0 deletions pkg/pipeline/encode/encode_prom_metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package encode

import (
"fmt"
"regexp"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
)

type predicate func(flow config.GenericMap) bool

type metricInfo struct {
api.PromMetricsItem
filterPredicates []predicate
}

func presence(filter api.PromMetricsFilter) predicate {
return func(flow config.GenericMap) bool {
_, found := flow[filter.Key]
return found
}
}

func absence(filter api.PromMetricsFilter) predicate {
return func(flow config.GenericMap) bool {
_, found := flow[filter.Key]
return !found
}
}

func exact(filter api.PromMetricsFilter) predicate {
return func(flow config.GenericMap) bool {
if val, found := flow[filter.Key]; found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
return sVal == filter.Value
}
return false
}
}

func regex(filter api.PromMetricsFilter) predicate {
r, _ := regexp.Compile(filter.Value)
return func(flow config.GenericMap) bool {
if val, found := flow[filter.Key]; found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
return r.MatchString(sVal)
}
return false
}
}

func filterToPredicate(filter api.PromMetricsFilter) predicate {
switch filter.Type {
case api.PromFilterExact:
return exact(filter)
case api.PromFilterPresence:
return presence(filter)
case api.PromFilterAbsence:
return absence(filter)
case api.PromFilterRegex:
return regex(filter)
}
// Default = exact
return exact(filter)
}

func CreateMetricInfo(def api.PromMetricsItem) *metricInfo {
mi := metricInfo{
PromMetricsItem: def,
}
for _, f := range def.GetFilters() {
mi.filterPredicates = append(mi.filterPredicates, filterToPredicate(f))
}
return &mi
}
Loading
Loading