Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor metrics #590

Merged
merged 5 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 53 additions & 247 deletions pkg/pipeline/encode/encode_prom.go
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this file under a new prometheus folder ?

Original file line number Diff line number Diff line change
Expand Up @@ -18,249 +18,89 @@
package encode

import (
"fmt"
"strings"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)

const defaultExpiryTime = time.Duration(2 * time.Minute)

type gaugeInfo struct {
gauge *prometheus.GaugeVec
info *MetricInfo
}

type counterInfo struct {
counter *prometheus.CounterVec
info *MetricInfo
}

type histoInfo struct {
histo *prometheus.HistogramVec
info *MetricInfo
}

type EncodeProm struct {
cfg *api.PromEncode
registerer prometheus.Registerer
gauges []gaugeInfo
counters []counterInfo
histos []histoInfo
aggHistos []histoInfo
expiryTime time.Duration
mCache *putils.TimedCache
mChacheLenMetric prometheus.Gauge
exitChan <-chan struct{}
metricsProcessed prometheus.Counter
metricsDropped prometheus.Counter
errorsCounter *prometheus.CounterVec
cfg *api.PromEncode
registerer prometheus.Registerer
metricCommon *MetricsCommonStruct
}

var (
MetricsProcessed = operational.DefineMetric(
"metrics_processed",
"Number of metrics processed",
operational.TypeCounter,
"stage",
)
MetricsDropped = operational.DefineMetric(
"metrics_dropped",
"Number of metrics dropped",
operational.TypeCounter,
"stage",
)
EncodePromErrors = operational.DefineMetric(
"encode_prom_errors",
"Total errors during metrics generation",
operational.TypeCounter,
"error", "metric", "key",
)
mChacheLen = operational.DefineMetric(
"encode_prom_metrics_reported",
"Total number of prometheus metrics reported by this stage",
operational.TypeGauge,
"stage",
)
)

// Encode encodes a metric before being stored
// Encode encodes a metric before being stored; the heavy work is done by the MetricCommonEncode
func (e *EncodeProm) Encode(metricRecord config.GenericMap) {
log.Tracef("entering EncodeMetric. metricRecord = %v", metricRecord)

// Process counters
for _, mInfo := range e.counters {
labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.counter.MetricVec)
if labels == nil {
continue
}
m, err := mInfo.counter.GetMetricWith(labels)
if err != nil {
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
continue
}
m.Add(value)
e.metricsProcessed.Inc()
}

// Process gauges
for _, mInfo := range e.gauges {
labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.gauge.MetricVec)
if labels == nil {
continue
}
m, err := mInfo.gauge.GetMetricWith(labels)
if err != nil {
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
continue
}
m.Set(value)
e.metricsProcessed.Inc()
}

// Process histograms
for _, mInfo := range e.histos {
labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.histo.MetricVec)
if labels == nil {
continue
}
m, err := mInfo.histo.GetMetricWith(labels)
if err != nil {
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
continue
}
m.Observe(value)
e.metricsProcessed.Inc()
}

// Process pre-aggregated histograms
for _, mInfo := range e.aggHistos {
labels, values := e.prepareAggHisto(metricRecord, mInfo.info, mInfo.histo.MetricVec)
if labels == nil {
continue
}
m, err := mInfo.histo.GetMetricWith(labels)
if err != nil {
log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err)
e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc()
continue
}
for _, v := range values {
m.Observe(v)
}
e.metricsProcessed.Inc()
}
e.metricCommon.MetricCommonEncode(e, metricRecord)
}

func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *MetricInfo, m *prometheus.MetricVec) (map[string]string, float64) {
val := e.extractGenericValue(flow, info)
if val == nil {
return nil, 0
}
floatVal, err := utils.ConvertToFloat64(val)
func (e *EncodeProm) ProcessCounter(m interface{}, labels map[string]string, value float64) error {
counter := m.(*prometheus.CounterVec)
mm, err := counter.GetMetricWith(labels)
if err != nil {
e.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc()
return nil, 0
}
if info.ValueScale != 0 {
floatVal = floatVal / info.ValueScale
return err
}

entryLabels, key := ExtractLabelsAndKey(flow, &info.MetricsItem)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok := e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
e.metricsDropped.Inc()
return nil, 0
}
return entryLabels, floatVal
mm.Add(value)
return nil
}

func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *MetricInfo, m *prometheus.MetricVec) (map[string]string, []float64) {
val := e.extractGenericValue(flow, info)
if val == nil {
return nil, nil
}
values, ok := val.([]float64)
if !ok {
e.errorsCounter.WithLabelValues("HistoValueConversionError", info.Name, info.ValueKey).Inc()
return nil, nil
func (e *EncodeProm) ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error {
gauge := m.(*prometheus.GaugeVec)
mm, err := gauge.GetMetricWith(labels)
if err != nil {
return err
}
mm.Set(value)
return nil
}

entryLabels, key := ExtractLabelsAndKey(flow, &info.MetricsItem)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok = e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
e.metricsDropped.Inc()
return nil, nil
func (e *EncodeProm) ProcessHist(m interface{}, labels map[string]string, value float64) error {
hist := m.(*prometheus.HistogramVec)
mm, err := hist.GetMetricWith(labels)
if err != nil {
return err
}
return entryLabels, values
mm.Observe(value)
return nil
}

func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *MetricInfo) interface{} {
for _, pred := range info.FilterPredicates {
if !pred(flow) {
return nil
}
}
if info.ValueKey == "" {
// No value key means it's a records / flows counter (1 flow = 1 increment), so just return 1
return 1
func (e *EncodeProm) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error {
hist := m.(*prometheus.HistogramVec)
mm, err := hist.GetMetricWith(labels)
if err != nil {
return err
}
val, found := flow[info.ValueKey]
if !found {
e.errorsCounter.WithLabelValues("RecordKeyMissing", info.Name, info.ValueKey).Inc()
return nil
for _, v := range values {
mm.Observe(v)
}
return val
return nil
}

func ExtractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[string]string, string) {
entryLabels := make(map[string]string, len(info.Labels))
key := strings.Builder{}
key.WriteString(info.Name)
key.WriteRune('|')
for _, t := range info.Labels {
entryLabels[t] = ""
if v, ok := flow[t]; ok {
entryLabels[t] = fmt.Sprintf("%v", v)
}
key.WriteString(entryLabels[t])
key.WriteRune('|')
func (e *EncodeProm) GetChacheEntry(entryLabels map[string]string, m interface{}) interface{} {
switch mv := m.(type) {
case *prometheus.CounterVec:
return func() { mv.Delete(entryLabels) }
case *prometheus.GaugeVec:
return func() { mv.Delete(entryLabels) }
case *prometheus.HistogramVec:
return func() { mv.Delete(entryLabels) }
}
return entryLabels, key.String()
return nil
}

// callback function from lru cleanup
func (e *EncodeProm) Cleanup(cleanupFunc interface{}) {
cleanupFunc.(func())()
}

func (e *EncodeProm) cleanupExpiredEntriesLoop() {
ticker := time.NewTicker(e.expiryTime)
for {
select {
case <-e.exitChan:
log.Debugf("exiting cleanupExpiredEntriesLoop because of signal")
return
case <-ticker.C:
e.mCache.CleanupExpiredEntries(e.expiryTime, e.Cleanup)
}
}
}

func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) {
cfg := api.PromEncode{}
if params.Encode != nil && params.Encode.Prom != nil {
Expand All @@ -282,11 +122,13 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
} else {
registerer = prometheus.DefaultRegisterer
}
w := &EncodeProm{
cfg: params.Encode.Prom,
registerer: registerer,
}

counters := []counterInfo{}
gauges := []gaugeInfo{}
histos := []histoInfo{}
aggHistos := []histoInfo{}
metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup)
w.metricCommon = metricCommon

for _, mCfg := range cfg.Metrics {
fullMetricName := cfg.Prefix + mCfg.Name
Expand All @@ -302,21 +144,15 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
counters = append(counters, counterInfo{
counter: counter,
info: mInfo,
})
metricCommon.AddCounter(counter, mInfo)
case api.MetricEncodeOperationName("Gauge"):
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels)
err := registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
gauges = append(gauges, gaugeInfo{
gauge: gauge,
info: mInfo,
})
metricCommon.AddGauge(gauge, mInfo)
case api.MetricEncodeOperationName("Histogram"):
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
Expand All @@ -325,10 +161,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
histos = append(histos, histoInfo{
histo: hist,
info: mInfo,
})
metricCommon.AddHist(hist, mInfo)
case api.MetricEncodeOperationName("AggHistogram"):
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
Expand All @@ -337,38 +170,11 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
aggHistos = append(aggHistos, histoInfo{
histo: hist,
info: mInfo,
})
metricCommon.AddAggHist(hist, mInfo)
case "default":
log.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
}
}

log.Debugf("counters = %v", counters)
log.Debugf("gauges = %v", gauges)
log.Debugf("histos = %v", histos)
log.Debugf("aggHistos = %v", aggHistos)

mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, params.Name)

w := &EncodeProm{
cfg: params.Encode.Prom,
registerer: registerer,
counters: counters,
gauges: gauges,
histos: histos,
aggHistos: aggHistos,
expiryTime: expiryTime.Duration,
mCache: putils.NewTimedCache(cfg.MaxMetrics, mChacheLenMetric),
mChacheLenMetric: mChacheLenMetric,
exitChan: putils.ExitChannel(),
metricsProcessed: opMetrics.NewCounter(&MetricsProcessed, params.Name),
metricsDropped: opMetrics.NewCounter(&MetricsDropped, params.Name),
errorsCounter: opMetrics.NewCounterVec(&EncodePromErrors),
}
go w.cleanupExpiredEntriesLoop()
return w, nil
}
Loading
Loading