Skip to content

Commit

Permalink
Improve promencode API: scaling & filters
Browse files Browse the repository at this point in the history
- New ValueScale config to allow scale conversion, e.g. RTT ns to
  seconds
- To avoid having reserved characters or words in filters, use a dedicated
  API instead: new setting Filter.Type (exact,presence,absence or regex)
- Pre-compute regex by setting up a "predicate" function per filter
  • Loading branch information
jotak committed Nov 8, 2023
1 parent 47c2095 commit 9606faa
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 56 deletions.
4 changes: 4 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const (
AddServiceRuleType = "add_service"
AddKubernetesRuleType = "add_kubernetes"
ReinterpretDirectionRuleType = "reinterpret_direction"
PromFilterExact = "exact"
PromFilterPresence = "presence"
PromFilterAbsence = "absence"
PromFilterRegex = "regex"

TagYaml = "yaml"
TagDoc = "doc"
Expand Down
29 changes: 21 additions & 8 deletions pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ func PromEncodeOperationName(operation string) string {
}

type PromMetricsItem struct {
Name string `yaml:"name" json:"name" doc:"the metric name"`
Type string `yaml:"type" json:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"`
Filter PromMetricsFilter `yaml:"filter,omitempty" json:"filter,omitempty" doc:"an optional criterion to filter entries by. Deprecated: use filters instead."`
Filters []PromMetricsFilter `yaml:"filters" json:"filters" doc:"a list of criteria to filter entries by"`
ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"`
Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"`
Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"`
Name string `yaml:"name" json:"name" doc:"the metric name"`
Type string `yaml:"type" json:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"`
Filter PromMetricsFilter `yaml:"filter,omitempty" json:"filter,omitempty" doc:"an optional criterion to filter entries by. Deprecated: use filters instead."`
Filters []PromMetricsFilter `yaml:"filters" json:"filters" doc:"a list of criteria to filter entries by"`
ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"`
Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"`
Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"`
ValueScale float64 `yaml:"valueScale" json:"valueScale" doc:"scale factor of the value (MetricVal := FlowVal / Scale)"`
}

func (i *PromMetricsItem) GetFilters() []PromMetricsFilter {
Expand All @@ -61,5 +62,17 @@ type PromMetricsItems []PromMetricsItem

type PromMetricsFilter struct {
Key string `yaml:"key" json:"key" doc:"the key to match and filter by"`
Value string `yaml:"value" json:"value" doc:"the value to match and filter by. Use !nil / nil to match presence / absence. Add multiple matching values using '|' rune such as 'a|b' to match either 'a' or 'b'."`
Value string `yaml:"value" json:"value" doc:"the value to match and filter by"`
Type string `yaml:"type" json:"type" enum:"PromEncodeFilterTypeEnum" doc:"the type of filter match: exact (default), presence, absence or regex"`
}

type PromEncodeFilterTypeEnum struct {
Exact string `yaml:"exact" json:"exact" doc:"match exactly the provided fitler value"`
Presence string `yaml:"presence" json:"presence" doc:"filter key must be present (filter value is ignored)"`
Absence string `yaml:"absence" json:"absence" doc:"filter key must be absent (filter value is ignored)"`
Regex string `yaml:"regex" json:"regex" doc:"match filter value as a regular expression"`
}

func PromEncodeFilterTypeName(t string) string {
return GetEnumName(PromEncodeFilterTypeEnum{}, t)
}
1 change: 1 addition & 0 deletions pkg/api/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

type enums struct {
PromEncodeOperationEnum PromEncodeOperationEnum
PromEncodeFilterTypeEnum PromEncodeFilterTypeEnum
TransformNetworkOperationEnum TransformNetworkOperationEnum
TransformFilterOperationEnum TransformFilterOperationEnum
TransformGenericOperationEnum TransformGenericOperationEnum
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[4])
require.NoError(t, err)
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"","value":""},"filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b))
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"","type":"","value":""},"filters":[{"key":"name","type":"","value":"src_as_connection_count"}],"valueKey":"recent_count","valueScale":0,"labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b))
}

func TestForkPipeline(t *testing.T) {
Expand Down
71 changes: 28 additions & 43 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
"k8s.io/utils/strings/slices"

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
Expand All @@ -37,17 +36,17 @@ const defaultExpiryTime = time.Duration(2 * time.Minute)

type gaugeInfo struct {
gauge *prometheus.GaugeVec
info api.PromMetricsItem
info *metricInfo
}

type counterInfo struct {
counter *prometheus.CounterVec
info api.PromMetricsItem
info *metricInfo
}

type histoInfo struct {
histo *prometheus.HistogramVec
info api.PromMetricsItem
info *metricInfo
}

type EncodeProm struct {
Expand Down Expand Up @@ -97,7 +96,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) {

// Process counters
for _, mInfo := range e.counters {
labels, value := e.prepareMetric(metricRecord, &mInfo.info, mInfo.counter.MetricVec)
labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.counter.MetricVec)
if labels == nil {
continue
}
Expand All @@ -113,7 +112,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) {

// Process gauges
for _, mInfo := range e.gauges {
labels, value := e.prepareMetric(metricRecord, &mInfo.info, mInfo.gauge.MetricVec)
labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.gauge.MetricVec)
if labels == nil {
continue
}
Expand All @@ -129,7 +128,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) {

// Process histograms
for _, mInfo := range e.histos {
labels, value := e.prepareMetric(metricRecord, &mInfo.info, mInfo.histo.MetricVec)
labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.histo.MetricVec)
if labels == nil {
continue
}
Expand All @@ -145,7 +144,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) {

// Process pre-aggregated histograms
for _, mInfo := range e.aggHistos {
labels, values := e.prepareAggHisto(metricRecord, &mInfo.info, mInfo.histo.MetricVec)
labels, values := e.prepareAggHisto(metricRecord, mInfo.info, mInfo.histo.MetricVec)
if labels == nil {
continue
}
Expand All @@ -162,7 +161,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) {
}
}

func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.PromMetricsItem, m *prometheus.MetricVec) (map[string]string, float64) {
func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *metricInfo, m *prometheus.MetricVec) (map[string]string, float64) {
val := e.extractGenericValue(flow, info)
if val == nil {
return nil, 0
Expand All @@ -172,8 +171,11 @@ func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.PromMetrics
e.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc()
return nil, 0
}
if info.ValueScale != 0 {
floatVal = floatVal / info.ValueScale
}

entryLabels, key := e.extractLabelsAndKey(flow, info)
entryLabels, key := e.extractLabelsAndKey(flow, &info.PromMetricsItem)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok := e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
Expand All @@ -183,7 +185,7 @@ func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *api.PromMetrics
return entryLabels, floatVal
}

func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetricsItem, m *prometheus.MetricVec) (map[string]string, []float64) {
func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *metricInfo, m *prometheus.MetricVec) (map[string]string, []float64) {
val := e.extractGenericValue(flow, info)
if val == nil {
return nil, nil
Expand All @@ -194,7 +196,7 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetri
return nil, nil
}

entryLabels, key := e.extractLabelsAndKey(flow, info)
entryLabels, key := e.extractLabelsAndKey(flow, &info.PromMetricsItem)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok = e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
Expand All @@ -204,28 +206,10 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *api.PromMetri
return entryLabels, values
}

func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *api.PromMetricsItem) interface{} {
for _, filter := range info.GetFilters() {
val, found := flow[filter.Key]
switch filter.Value {
case "nil":
if found {
return nil
}
case "!nil":
if !found {
return nil
}
default:
if found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
if !slices.Contains(strings.Split(filter.Value, "|"), sVal) {
return nil
}
}
func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *metricInfo) interface{} {
for _, pred := range info.filterPredicates {
if !pred(flow) {
return nil
}
}
if info.ValueKey == "" {
Expand Down Expand Up @@ -291,12 +275,13 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
histos := []histoInfo{}
aggHistos := []histoInfo{}

for _, mInfo := range cfg.Metrics {
fullMetricName := cfg.Prefix + mInfo.Name
labels := mInfo.Labels
for _, mCfg := range cfg.Metrics {
fullMetricName := cfg.Prefix + mCfg.Name
labels := mCfg.Labels
log.Debugf("fullMetricName = %v", fullMetricName)
log.Debugf("Labels = %v", labels)
switch mInfo.Type {
mInfo := CreateMetricInfo(mCfg)
switch mCfg.Type {
case api.PromEncodeOperationName("Counter"):
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, labels)
err := prometheus.Register(counter)
Expand All @@ -320,8 +305,8 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
info: mInfo,
})
case api.PromEncodeOperationName("Histogram"):
log.Debugf("buckets = %v", mInfo.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mInfo.Buckets}, labels)
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
err := prometheus.Register(hist)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
Expand All @@ -332,8 +317,8 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
info: mInfo,
})
case api.PromEncodeOperationName("AggHistogram"):
log.Debugf("buckets = %v", mInfo.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mInfo.Buckets}, labels)
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
err := prometheus.Register(hist)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
Expand All @@ -344,7 +329,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
info: mInfo,
})
case "default":
log.Errorf("invalid metric type = %v, skipping", mInfo.Type)
log.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
}
}
Expand Down
82 changes: 82 additions & 0 deletions pkg/pipeline/encode/encode_prom_metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package encode

import (
"fmt"
"regexp"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
)

type predicate func(flow config.GenericMap) bool

type metricInfo struct {
api.PromMetricsItem
filterPredicates []predicate
}

func presence(filter api.PromMetricsFilter) predicate {
return func(flow config.GenericMap) bool {
_, found := flow[filter.Key]
return found
}
}

func absence(filter api.PromMetricsFilter) predicate {
return func(flow config.GenericMap) bool {
_, found := flow[filter.Key]
return !found
}
}

func exact(filter api.PromMetricsFilter) predicate {
return func(flow config.GenericMap) bool {
if val, found := flow[filter.Key]; found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
return sVal == filter.Value
}
return false
}
}

func regex(filter api.PromMetricsFilter) predicate {
r, _ := regexp.Compile(filter.Value)
return func(flow config.GenericMap) bool {
if val, found := flow[filter.Key]; found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
return r.MatchString(sVal)
}
return false
}
}

func filterToPredicate(filter api.PromMetricsFilter) predicate {
switch filter.Type {
case api.PromFilterExact:
return exact(filter)
case api.PromFilterPresence:
return presence(filter)
case api.PromFilterAbsence:
return absence(filter)
case api.PromFilterRegex:
return regex(filter)
}
// Default = exact
return exact(filter)
}

func CreateMetricInfo(def api.PromMetricsItem) *metricInfo {
mi := metricInfo{
PromMetricsItem: def,
}
for _, f := range def.GetFilters() {
mi.filterPredicates = append(mi.filterPredicates, filterToPredicate(f))
}
return &mi
}
Loading

0 comments on commit 9606faa

Please sign in to comment.