From 48937b22b8dbdb4b2d46c517833a45411cc2689d Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Tue, 6 Feb 2024 11:41:52 +0200 Subject: [PATCH 1/5] refactor metrics code to be common for prometheus and otlp --- .../{opentelemetry => }/encode_otlp_test.go | 29 +- .../{opentelemetry => }/encode_otlplogs.go | 5 +- pkg/pipeline/encode/encode_otlpmetrics.go | 180 +++++++++++ .../{opentelemetry => }/encode_otlptrace.go | 5 +- pkg/pipeline/encode/encode_prom.go | 300 ++++-------------- pkg/pipeline/encode/encode_prom_test.go | 48 +-- .../{opentelemetry => }/opentelemetry.go | 2 +- .../opentelemetry/encode_otlpmetrics.go | 284 ----------------- pkg/pipeline/encode/prom_cache_test.go | 12 +- pkg/pipeline/pipeline_builder.go | 7 +- 10 files changed, 285 insertions(+), 587 deletions(-) rename pkg/pipeline/encode/{opentelemetry => }/encode_otlp_test.go (88%) rename pkg/pipeline/encode/{opentelemetry => }/encode_otlplogs.go (93%) create mode 100644 pkg/pipeline/encode/encode_otlpmetrics.go rename pkg/pipeline/encode/{opentelemetry => }/encode_otlptrace.go (96%) rename pkg/pipeline/encode/{opentelemetry => }/opentelemetry.go (99%) delete mode 100644 pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go b/pkg/pipeline/encode/encode_otlp_test.go similarity index 88% rename from pkg/pipeline/encode/opentelemetry/encode_otlp_test.go rename to pkg/pipeline/encode/encode_otlp_test.go index 5d882ebef..5e2206de0 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go +++ b/pkg/pipeline/encode/encode_otlp_test.go @@ -15,7 +15,7 @@ * */ -package opentelemetry +package encode import ( "encoding/json" @@ -26,7 +26,6 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/require" ) @@ -50,18 +49,18 @@ type fakeOltpLoggerProvider struct { type fakeOltpLogger struct { } -var receivedData []logs.LogRecord +var otlpReceivedData []logs.LogRecord func (f *fakeOltpLogger) Emit(msg logs.LogRecord) { - receivedData = append(receivedData, msg) + otlpReceivedData = append(otlpReceivedData, msg) } func (f *fakeOltpLoggerProvider) Logger(name string, options ...logs.LoggerOption) logs.Logger { return &fakeOltpLogger{} } -func initNewEncodeOtlpLogs(t *testing.T) encode.Encoder { - receivedData = []logs.LogRecord{} +func initNewEncodeOtlpLogs(t *testing.T) Encoder { + otlpReceivedData = []logs.LogRecord{} v, cfg := test.InitConfig(t, testOtlpConfig) require.NotNil(t, v) @@ -88,14 +87,14 @@ func Test_EncodeOtlpLogs(t *testing.T) { newEncode.Encode(entry1) newEncode.Encode(entry2) // verify contents of the output - require.Len(t, receivedData, 2) + require.Len(t, otlpReceivedData, 2) expected1, _ := json.Marshal(entry1) expected2, _ := json.Marshal(entry2) - require.Equal(t, string(expected1), *receivedData[0].Body()) - require.Equal(t, string(expected2), *receivedData[1].Body()) + require.Equal(t, string(expected1), *otlpReceivedData[0].Body()) + require.Equal(t, string(expected2), *otlpReceivedData[1].Body()) } -func Test_TLSConfigEmpty(t *testing.T) { +func Test_EncodeOtlpTLSConfigEmpty(t *testing.T) { cfg := config.StageParam{ Encode: &config.Encode{ OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{ @@ -111,7 +110,7 @@ func Test_TLSConfigEmpty(t *testing.T) { require.NotNil(t, newEncode) } -func Test_TLSConfigNotEmpty(t *testing.T) { +func Test_EncodeOtlpTLSConfigNotEmpty(t *testing.T) { cfg := config.StageParam{ Encode: &config.Encode{ OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{ @@ -194,8 +193,8 @@ func Test_EncodeOtlpMetrics(t *testing.T) { require.NoError(t, err) require.NotNil(t, newEncode) em := newEncode.(*EncodeOtlpMetrics) - require.Equal(t, 2, len(em.counters)) - require.Equal(t, 1, len(em.gauges)) - require.Equal(t, "metric3", em.counters[1].info.Name) - require.Equal(t, []string{"label21", "label22"}, em.gauges[0].info.Labels) + require.Equal(t, 2, len(em.metricCommon.counters)) + require.Equal(t, 1, len(em.metricCommon.gauges)) + require.Equal(t, "metric3", em.metricCommon.counters[1].info.Name) + require.Equal(t, []string{"label21", "label22"}, em.metricCommon.gauges[0].info.Labels) } diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go b/pkg/pipeline/encode/encode_otlplogs.go similarity index 93% rename from pkg/pipeline/encode/opentelemetry/encode_otlplogs.go rename to pkg/pipeline/encode/encode_otlplogs.go index d67a073c8..041b51b03 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go +++ b/pkg/pipeline/encode/encode_otlplogs.go @@ -15,7 +15,7 @@ * */ -package opentelemetry +package encode import ( "context" @@ -24,7 +24,6 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/sdk/resource" ) @@ -42,7 +41,7 @@ func (e *EncodeOtlpLogs) Encode(entry config.GenericMap) { e.LogWrite(entry) } -func NewEncodeOtlpLogs(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) { +func NewEncodeOtlpLogs(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { log.Tracef("entering NewEncodeOtlpLogs \n") cfg := api.EncodeOtlpLogs{} if params.Encode != nil && params.Encode.OtlpLogs != nil { diff --git a/pkg/pipeline/encode/encode_otlpmetrics.go b/pkg/pipeline/encode/encode_otlpmetrics.go new file mode 100644 index 000000000..cffe8f3aa --- /dev/null +++ b/pkg/pipeline/encode/encode_otlpmetrics.go @@ -0,0 +1,180 @@ +/* + * Copyright (C) 2023 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package encode + +import ( + "context" + "fmt" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" +) + +const flpMeterName = "flp_meter" + +type EncodeOtlpMetrics struct { + cfg api.EncodeOtlpMetrics + ctx context.Context + res *resource.Resource + mp *sdkmetric.MeterProvider + meter metric.Meter + metricCommon *MetricsCommonStruct +} + +// Encode encodes a metric to be exported +func (e *EncodeOtlpMetrics) Encode(metricRecord config.GenericMap) { + log.Tracef("entering EncodeOtlpMetrics. entry = %v", metricRecord) + e.metricCommon.MetricCommonEncode(e, metricRecord) +} + +func (e *EncodeOtlpMetrics) ProcessCounter(m interface{}, labels map[string]string, value float64) error { + counter := m.(metric.Float64Counter) + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + counter.Add(e.ctx, value, metric.WithAttributes(attributes...)) + return nil +} + +func (e *EncodeOtlpMetrics) ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error { + obs := m.(Float64Gauge) + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + obs.Set(key, value, attributes) + return nil +} + +func (e *EncodeOtlpMetrics) ProcessHist(m interface{}, labels map[string]string, value float64) error { + return fmt.Errorf("histogram not yet implemented") +} + +func (e *EncodeOtlpMetrics) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error { + return fmt.Errorf("agg histogram not yet implemented") +} + +func (e *EncodeOtlpMetrics) GetChacheEntry(entryLabels map[string]string, m interface{}) interface{} { + return entryLabels +} + +func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { + log.Tracef("entering NewEncodeOtlpMetrics \n") + cfg := api.EncodeOtlpMetrics{} + if params.Encode != nil && params.Encode.OtlpMetrics != nil { + cfg = *params.Encode.OtlpMetrics + } + log.Debugf("NewEncodeOtlpMetrics cfg = %v \n", cfg) + + ctx := context.Background() + res := newResource() + + mp, err := NewOtlpMetricsProvider(ctx, params, res) + if err != nil { + return nil, err + } + meter := mp.Meter( + flpMeterName, + ) + + expiryTime := cfg.ExpiryTime + if expiryTime.Duration == 0 { + expiryTime.Duration = defaultExpiryTime + } + + meterFactory := otel.Meter(flpMeterName) + + w := &EncodeOtlpMetrics{ + cfg: cfg, + ctx: ctx, + res: res, + mp: mp, + meter: meterFactory, + } + + metricCommon := NewMetricsCommonStruct(opMetrics, 0, params.Name, expiryTime, nil) + w.metricCommon = metricCommon + + for _, mCfg := range cfg.Metrics { + fullMetricName := cfg.Prefix + mCfg.Name + labels := mCfg.Labels + log.Debugf("fullMetricName = %v", fullMetricName) + log.Debugf("Labels = %v", labels) + mInfo := CreateMetricInfo(mCfg) + switch mCfg.Type { + case api.MetricEncodeOperationName("Counter"): + counter, err := meter.Float64Counter(fullMetricName) + if err != nil { + log.Errorf("error during counter creation: %v", err) + return nil, err + } + metricCommon.AddCounter(counter, mInfo) + case api.MetricEncodeOperationName("Gauge"): + // at implementation time, only asynchronous gauges are supported by otel in golang + obs := Float64Gauge{observations: make(map[string]Float64GaugeEntry)} + gauge, err := meterFactory.Float64ObservableGauge( + fullMetricName, + metric.WithFloat64Callback(obs.Callback), + ) + if err != nil { + log.Errorf("error during gauge creation: %v", err) + return nil, err + } + metricCommon.AddGauge(gauge, mInfo) + // TBD: handle histograms + case "default": + log.Errorf("invalid metric type = %v, skipping", mCfg.Type) + continue + } + } + + return w, nil +} + +// At present, golang only supports asynchronous gauge, so we have some function here to support this + +type Float64GaugeEntry struct { + attributes []attribute.KeyValue + value float64 +} + +type Float64Gauge struct { + observations map[string]Float64GaugeEntry +} + +// Callback implements the callback function for the underlying asynchronous gauge +// it observes the current state of all previous Set() calls. +func (f *Float64Gauge) Callback(ctx context.Context, o metric.Float64Observer) error { + for _, fEntry := range f.observations { + o.Observe(fEntry.value, metric.WithAttributes(fEntry.attributes...)) + } + // re-initialize the observed items + f.observations = make(map[string]Float64GaugeEntry) + return nil +} + +func (f *Float64Gauge) Set(key string, val float64, attrs []attribute.KeyValue) { + f.observations[key] = Float64GaugeEntry{ + value: val, + attributes: attrs, + } +} diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go b/pkg/pipeline/encode/encode_otlptrace.go similarity index 96% rename from pkg/pipeline/encode/opentelemetry/encode_otlptrace.go rename to pkg/pipeline/encode/encode_otlptrace.go index e1fe98791..2ba7d0269 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go +++ b/pkg/pipeline/encode/encode_otlptrace.go @@ -15,7 +15,7 @@ * */ -package opentelemetry +package encode import ( "context" @@ -24,7 +24,6 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -91,7 +90,7 @@ OUTER: } } -func NewEncodeOtlpTraces(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) { +func NewEncodeOtlpTraces(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { log.Tracef("entering NewEncodeOtlpTraces \n") cfg := api.EncodeOtlpTraces{} if params.Encode != nil && params.Encode.OtlpTraces != nil { diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 9780e1bea..9cdef8e1a 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -18,229 +18,82 @@ package encode import ( - "fmt" - "strings" "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" - putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" - "github.com/netobserv/flowlogs-pipeline/pkg/utils" - "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" ) const defaultExpiryTime = time.Duration(2 * time.Minute) -type gaugeInfo struct { - gauge *prometheus.GaugeVec - info *MetricInfo -} - -type counterInfo struct { - counter *prometheus.CounterVec - info *MetricInfo -} - -type histoInfo struct { - histo *prometheus.HistogramVec - info *MetricInfo -} - type EncodeProm struct { - cfg *api.PromEncode - registerer prometheus.Registerer - gauges []gaugeInfo - counters []counterInfo - histos []histoInfo - aggHistos []histoInfo - expiryTime time.Duration - mCache *putils.TimedCache - mChacheLenMetric prometheus.Gauge - exitChan <-chan struct{} - metricsProcessed prometheus.Counter - metricsDropped prometheus.Counter - errorsCounter *prometheus.CounterVec + cfg *api.PromEncode + registerer prometheus.Registerer + metricCommon *MetricsCommonStruct } -var ( - MetricsProcessed = operational.DefineMetric( - "metrics_processed", - "Number of metrics processed", - operational.TypeCounter, - "stage", - ) - MetricsDropped = operational.DefineMetric( - "metrics_dropped", - "Number of metrics dropped", - operational.TypeCounter, - "stage", - ) - EncodePromErrors = operational.DefineMetric( - "encode_prom_errors", - "Total errors during metrics generation", - operational.TypeCounter, - "error", "metric", "key", - ) - mChacheLen = operational.DefineMetric( - "encode_prom_metrics_reported", - "Total number of prometheus metrics reported by this stage", - operational.TypeGauge, - "stage", - ) -) - -// Encode encodes a metric before being stored +// Encode encodes a metric before being stored; the heavy work is done by the MetricCommonEncode func (e *EncodeProm) Encode(metricRecord config.GenericMap) { log.Tracef("entering EncodeMetric. metricRecord = %v", metricRecord) - - // Process counters - for _, mInfo := range e.counters { - labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.counter.MetricVec) - if labels == nil { - continue - } - m, err := mInfo.counter.GetMetricWith(labels) - if err != nil { - log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() - continue - } - m.Add(value) - e.metricsProcessed.Inc() - } - - // Process gauges - for _, mInfo := range e.gauges { - labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.gauge.MetricVec) - if labels == nil { - continue - } - m, err := mInfo.gauge.GetMetricWith(labels) - if err != nil { - log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() - continue - } - m.Set(value) - e.metricsProcessed.Inc() - } - - // Process histograms - for _, mInfo := range e.histos { - labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.histo.MetricVec) - if labels == nil { - continue - } - m, err := mInfo.histo.GetMetricWith(labels) - if err != nil { - log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() - continue - } - m.Observe(value) - e.metricsProcessed.Inc() - } - - // Process pre-aggregated histograms - for _, mInfo := range e.aggHistos { - labels, values := e.prepareAggHisto(metricRecord, mInfo.info, mInfo.histo.MetricVec) - if labels == nil { - continue - } - m, err := mInfo.histo.GetMetricWith(labels) - if err != nil { - log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() - continue - } - for _, v := range values { - m.Observe(v) - } - e.metricsProcessed.Inc() - } + e.metricCommon.MetricCommonEncode(e, metricRecord) } -func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *MetricInfo, m *prometheus.MetricVec) (map[string]string, float64) { - val := e.extractGenericValue(flow, info) - if val == nil { - return nil, 0 - } - floatVal, err := utils.ConvertToFloat64(val) +func (e *EncodeProm) ProcessCounter(m interface{}, labels map[string]string, value float64) error { + counter := m.(*prometheus.CounterVec) + mm, err := counter.GetMetricWith(labels) if err != nil { - e.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc() - return nil, 0 - } - if info.ValueScale != 0 { - floatVal = floatVal / info.ValueScale + return err } - - entryLabels, key := ExtractLabelsAndKey(flow, &info.MetricsItem) - // Update entry for expiry mechanism (the entry itself is its own cleanup function) - _, ok := e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) }) - if !ok { - e.metricsDropped.Inc() - return nil, 0 - } - return entryLabels, floatVal + mm.Add(value) + return nil } -func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *MetricInfo, m *prometheus.MetricVec) (map[string]string, []float64) { - val := e.extractGenericValue(flow, info) - if val == nil { - return nil, nil - } - values, ok := val.([]float64) - if !ok { - e.errorsCounter.WithLabelValues("HistoValueConversionError", info.Name, info.ValueKey).Inc() - return nil, nil +func (e *EncodeProm) ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error { + gauge := m.(*prometheus.GaugeVec) + mm, err := gauge.GetMetricWith(labels) + if err != nil { + return err } + mm.Set(value) + return nil +} - entryLabels, key := ExtractLabelsAndKey(flow, &info.MetricsItem) - // Update entry for expiry mechanism (the entry itself is its own cleanup function) - _, ok = e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) }) - if !ok { - e.metricsDropped.Inc() - return nil, nil +func (e *EncodeProm) ProcessHist(m interface{}, labels map[string]string, value float64) error { + hist := m.(*prometheus.HistogramVec) + mm, err := hist.GetMetricWith(labels) + if err != nil { + return err } - return entryLabels, values + mm.Observe(value) + return nil } -func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *MetricInfo) interface{} { - for _, pred := range info.FilterPredicates { - if !pred(flow) { - return nil - } - } - if info.ValueKey == "" { - // No value key means it's a records / flows counter (1 flow = 1 increment), so just return 1 - return 1 +func (e *EncodeProm) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error { + hist := m.(*prometheus.HistogramVec) + mm, err := hist.GetMetricWith(labels) + if err != nil { + return err } - val, found := flow[info.ValueKey] - if !found { - e.errorsCounter.WithLabelValues("RecordKeyMissing", info.Name, info.ValueKey).Inc() - return nil + for _, v := range values { + mm.Observe(v) } - return val + return nil } -func ExtractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[string]string, string) { - entryLabels := make(map[string]string, len(info.Labels)) - key := strings.Builder{} - key.WriteString(info.Name) - key.WriteRune('|') - for _, t := range info.Labels { - entryLabels[t] = "" - if v, ok := flow[t]; ok { - entryLabels[t] = fmt.Sprintf("%v", v) - } - key.WriteString(entryLabels[t]) - key.WriteRune('|') +func (e *EncodeProm) GetChacheEntry(entryLabels map[string]string, m interface{}) interface{} { + switch mv := m.(type) { + case *prometheus.CounterVec: + return func() { mv.Delete(entryLabels) } + case *prometheus.GaugeVec: + return func() { mv.Delete(entryLabels) } + case *prometheus.HistogramVec: + return func() { mv.Delete(entryLabels) } } - return entryLabels, key.String() + return nil } // callback function from lru cleanup @@ -248,19 +101,6 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) { cleanupFunc.(func())() } -func (e *EncodeProm) cleanupExpiredEntriesLoop() { - ticker := time.NewTicker(e.expiryTime) - for { - select { - case <-e.exitChan: - log.Debugf("exiting cleanupExpiredEntriesLoop because of signal") - return - case <-ticker.C: - e.mCache.CleanupExpiredEntries(e.expiryTime, e.Cleanup) - } - } -} - func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { cfg := api.PromEncode{} if params.Encode != nil && params.Encode.Prom != nil { @@ -282,11 +122,13 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En } else { registerer = prometheus.DefaultRegisterer } + w := &EncodeProm{ + cfg: params.Encode.Prom, + registerer: registerer, + } - counters := []counterInfo{} - gauges := []gaugeInfo{} - histos := []histoInfo{} - aggHistos := []histoInfo{} + metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup) + w.metricCommon = metricCommon for _, mCfg := range cfg.Metrics { fullMetricName := cfg.Prefix + mCfg.Name @@ -302,10 +144,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Errorf("error during prometheus.Register: %v", err) return nil, err } - counters = append(counters, counterInfo{ - counter: counter, - info: mInfo, - }) + metricCommon.AddCounter(counter, mInfo) case api.MetricEncodeOperationName("Gauge"): gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels) err := registerer.Register(gauge) @@ -313,10 +152,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Errorf("error during prometheus.Register: %v", err) return nil, err } - gauges = append(gauges, gaugeInfo{ - gauge: gauge, - info: mInfo, - }) + metricCommon.AddGauge(gauge, mInfo) case api.MetricEncodeOperationName("Histogram"): log.Debugf("buckets = %v", mCfg.Buckets) hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) @@ -325,10 +161,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Errorf("error during prometheus.Register: %v", err) return nil, err } - histos = append(histos, histoInfo{ - histo: hist, - info: mInfo, - }) + metricCommon.AddHist(hist, mInfo) case api.MetricEncodeOperationName("AggHistogram"): log.Debugf("buckets = %v", mCfg.Buckets) hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) @@ -337,38 +170,11 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Errorf("error during prometheus.Register: %v", err) return nil, err } - aggHistos = append(aggHistos, histoInfo{ - histo: hist, - info: mInfo, - }) + metricCommon.AddAggHist(hist, mInfo) case "default": log.Errorf("invalid metric type = %v, skipping", mCfg.Type) continue } } - - log.Debugf("counters = %v", counters) - log.Debugf("gauges = %v", gauges) - log.Debugf("histos = %v", histos) - log.Debugf("aggHistos = %v", aggHistos) - - mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, params.Name) - - w := &EncodeProm{ - cfg: params.Encode.Prom, - registerer: registerer, - counters: counters, - gauges: gauges, - histos: histos, - aggHistos: aggHistos, - expiryTime: expiryTime.Duration, - mCache: putils.NewTimedCache(cfg.MaxMetrics, mChacheLenMetric), - mChacheLenMetric: mChacheLenMetric, - exitChan: putils.ExitChannel(), - metricsProcessed: opMetrics.NewCounter(&MetricsProcessed, params.Name), - metricsDropped: opMetrics.NewCounter(&MetricsDropped, params.Name), - errorsCounter: opMetrics.NewCounterVec(&EncodePromErrors), - } - go w.cleanupExpiredEntriesLoop() return w, nil } diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 6a2f81a83..2de05e4d0 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -93,30 +93,30 @@ func Test_NewEncodeProm(t *testing.T) { encodeProm, err := initProm(cfg.Parameters[0].Encode.Prom) require.NoError(t, err) - require.Equal(t, 1, len(encodeProm.counters)) - require.Equal(t, 1, len(encodeProm.gauges)) - require.Equal(t, 1, len(encodeProm.histos)) - require.Equal(t, 1, len(encodeProm.aggHistos)) - require.Equal(t, time.Second, encodeProm.expiryTime) + require.Equal(t, 1, len(encodeProm.metricCommon.counters)) + require.Equal(t, 1, len(encodeProm.metricCommon.gauges)) + require.Equal(t, 1, len(encodeProm.metricCommon.histos)) + require.Equal(t, 1, len(encodeProm.metricCommon.aggHistos)) + require.Equal(t, time.Second, encodeProm.metricCommon.expiryTime) - require.Equal(t, encodeProm.gauges[0].info.Name, "Bytes") + require.Equal(t, encodeProm.metricCommon.gauges[0].info.Name, "Bytes") expectedList := []string{"srcAddr", "dstAddr", "srcPort"} - require.Equal(t, encodeProm.gauges[0].info.Labels, expectedList) + require.Equal(t, encodeProm.metricCommon.gauges[0].info.Labels, expectedList) - require.Equal(t, encodeProm.counters[0].info.Name, "Packets") + require.Equal(t, encodeProm.metricCommon.counters[0].info.Name, "Packets") expectedList = []string{"srcAddr", "dstAddr", "dstPort"} - require.Equal(t, encodeProm.counters[0].info.Labels, expectedList) + require.Equal(t, encodeProm.metricCommon.counters[0].info.Labels, expectedList) entry := test.GetExtractMockEntry() encodeProm.Encode(entry) // verify entries are in cache; one for the gauge and one for the counter - entriesMapLen := encodeProm.mCache.GetCacheLen() + entriesMapLen := encodeProm.metricCommon.mCache.GetCacheLen() require.Equal(t, 2, entriesMapLen) // wait a couple seconds so that the entry will expire time.Sleep(2 * time.Second) - encodeProm.mCache.CleanupExpiredEntries(encodeProm.expiryTime, encodeProm.Cleanup) - entriesMapLen = encodeProm.mCache.GetCacheLen() + encodeProm.metricCommon.mCache.CleanupExpiredEntries(encodeProm.metricCommon.expiryTime, encodeProm.Cleanup) + entriesMapLen = encodeProm.metricCommon.mCache.GetCacheLen() require.Equal(t, 0, entriesMapLen) } @@ -631,18 +631,18 @@ func Test_MultipleProm(t *testing.T) { encodeProm3, err := initProm(cfg.Parameters[2].Encode.Prom) require.NoError(t, err) - require.Equal(t, 0, len(encodeProm1.counters)) - require.Equal(t, 1, len(encodeProm1.gauges)) - require.Equal(t, 0, len(encodeProm1.histos)) - require.Equal(t, 0, len(encodeProm1.aggHistos)) - require.Equal(t, 1, len(encodeProm2.counters)) - require.Equal(t, 0, len(encodeProm2.gauges)) - require.Equal(t, 0, len(encodeProm2.histos)) - require.Equal(t, 0, len(encodeProm2.aggHistos)) - require.Equal(t, 0, len(encodeProm3.counters)) - require.Equal(t, 0, len(encodeProm3.gauges)) - require.Equal(t, 1, len(encodeProm3.histos)) - require.Equal(t, 0, len(encodeProm3.aggHistos)) + require.Equal(t, 0, len(encodeProm1.metricCommon.counters)) + require.Equal(t, 1, len(encodeProm1.metricCommon.gauges)) + require.Equal(t, 0, len(encodeProm1.metricCommon.histos)) + require.Equal(t, 0, len(encodeProm1.metricCommon.aggHistos)) + require.Equal(t, 1, len(encodeProm2.metricCommon.counters)) + require.Equal(t, 0, len(encodeProm2.metricCommon.gauges)) + require.Equal(t, 0, len(encodeProm2.metricCommon.histos)) + require.Equal(t, 0, len(encodeProm2.metricCommon.aggHistos)) + require.Equal(t, 0, len(encodeProm3.metricCommon.counters)) + require.Equal(t, 0, len(encodeProm3.metricCommon.gauges)) + require.Equal(t, 1, len(encodeProm3.metricCommon.histos)) + require.Equal(t, 0, len(encodeProm3.metricCommon.aggHistos)) require.Equal(t, "test1_", encodeProm1.cfg.Prefix) require.Equal(t, "test2_", encodeProm2.cfg.Prefix) diff --git a/pkg/pipeline/encode/opentelemetry/opentelemetry.go b/pkg/pipeline/encode/opentelemetry.go similarity index 99% rename from pkg/pipeline/encode/opentelemetry/opentelemetry.go rename to pkg/pipeline/encode/opentelemetry.go index dae256f3a..7dc9c1264 100644 --- a/pkg/pipeline/encode/opentelemetry/opentelemetry.go +++ b/pkg/pipeline/encode/opentelemetry.go @@ -15,7 +15,7 @@ * */ -package opentelemetry +package encode import ( "context" diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go deleted file mode 100644 index 739a7d4e2..000000000 --- a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Copyright (C) 2023 IBM, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package opentelemetry - -import ( - "context" - "time" - - "github.com/netobserv/flowlogs-pipeline/pkg/api" - "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/netobserv/flowlogs-pipeline/pkg/operational" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" - putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" - "github.com/netobserv/flowlogs-pipeline/pkg/utils" - "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" -) - -// TODO: Refactor the code that is common with encode_prom -const ( - defaultExpiryTime = time.Duration(2 * time.Minute) - flpMeterName = "flp_meter" -) - -type counterInfo struct { - counter *metric.Float64Counter - info *encode.MetricInfo -} - -type gaugeInfo struct { - gauge *metric.Float64ObservableGauge - info *encode.MetricInfo - obs Float64Gauge -} - -// TBD: Handle histograms -/* -type histoInfo struct { - histo *metric.Float64Histogram - info *encode.MetricInfo -} -*/ - -type EncodeOtlpMetrics struct { - cfg api.EncodeOtlpMetrics - ctx context.Context - res *resource.Resource - mp *sdkmetric.MeterProvider - counters []counterInfo - gauges []gaugeInfo - //histos []histoInfo - //aggHistos []histoInfo - expiryTime time.Duration - mCache *putils.TimedCache - exitChan <-chan struct{} - meter metric.Meter - metricsProcessed prometheus.Counter - metricsDropped prometheus.Counter - errorsCounter *prometheus.CounterVec -} - -// Encode encodes a metric to be exported -func (e *EncodeOtlpMetrics) Encode(metricRecord config.GenericMap) { - log.Tracef("entering EncodeOtlpMetrics. entry = %v", metricRecord) - - // Process counters - for _, mInfo := range e.counters { - labels, value, _ := e.prepareMetric(metricRecord, mInfo.info) - if labels == nil { - continue - } - // set attributes using the labels - attributes := obtainAttributesFromLabels(labels) - (*mInfo.counter).Add(e.ctx, value, metric.WithAttributes(attributes...)) - e.metricsProcessed.Inc() - } - - // Process gauges - for _, mInfo := range e.gauges { - labels, value, key := e.prepareMetric(metricRecord, mInfo.info) - if labels == nil { - continue - } - // set attributes using the labels - attributes := obtainAttributesFromLabels(labels) - mInfo.obs.Set(key, value, attributes) - e.metricsProcessed.Inc() - } - // TBD: Process histograms -} - -func (e *EncodeOtlpMetrics) prepareMetric(flow config.GenericMap, info *encode.MetricInfo) (map[string]string, float64, string) { - val := e.extractGenericValue(flow, info) - if val == nil { - return nil, 0, "" - } - floatVal, err := utils.ConvertToFloat64(val) - if err != nil { - e.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc() - return nil, 0, "" - } - - entryLabels, key := encode.ExtractLabelsAndKey(flow, &info.MetricsItem) - // Update entry for expiry mechanism (the entry itself is its own cleanup function) - _, ok := e.mCache.UpdateCacheEntry(key, entryLabels) - if !ok { - e.metricsDropped.Inc() - return nil, 0, "" - } - return entryLabels, floatVal, key -} - -func (e *EncodeOtlpMetrics) extractGenericValue(flow config.GenericMap, info *encode.MetricInfo) interface{} { - for _, pred := range info.FilterPredicates { - if !pred(flow) { - return nil - } - } - if info.ValueKey == "" { - // No value key means it's a records / flows counter (1 flow = 1 increment), so just return 1 - return 1 - } - val, found := flow[info.ValueKey] - if !found { - e.errorsCounter.WithLabelValues("RecordKeyMissing", info.Name, info.ValueKey).Inc() - return nil - } - return val -} - -func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) { - log.Tracef("entering NewEncodeOtlpMetrics \n") - cfg := api.EncodeOtlpMetrics{} - if params.Encode != nil && params.Encode.OtlpMetrics != nil { - cfg = *params.Encode.OtlpMetrics - } - log.Debugf("NewEncodeOtlpMetrics cfg = %v \n", cfg) - - ctx := context.Background() - res := newResource() - - mp, err := NewOtlpMetricsProvider(ctx, params, res) - if err != nil { - return nil, err - } - meter := mp.Meter( - flpMeterName, - ) - - expiryTime := cfg.ExpiryTime - if expiryTime.Duration == 0 { - expiryTime.Duration = defaultExpiryTime - } - - meterFactory := otel.Meter(flpMeterName) - counters := []counterInfo{} - gauges := []gaugeInfo{} - - for _, mCfg := range cfg.Metrics { - fullMetricName := cfg.Prefix + mCfg.Name - labels := mCfg.Labels - log.Debugf("fullMetricName = %v", fullMetricName) - log.Debugf("Labels = %v", labels) - mInfo := encode.CreateMetricInfo(mCfg) - switch mCfg.Type { - case api.MetricEncodeOperationName("Counter"): - counter, err := meter.Float64Counter(fullMetricName) - if err != nil { - log.Errorf("error during counter creation: %v", err) - return nil, err - } - counters = append(counters, counterInfo{ - counter: &counter, - info: mInfo, - }) - case api.MetricEncodeOperationName("Gauge"): - // at implementation time, only asynchronous gauges are supported by otel in golang - obs := Float64Gauge{observations: make(map[string]Float64GaugeEntry)} - gauge, err := meterFactory.Float64ObservableGauge( - fullMetricName, - metric.WithFloat64Callback(obs.Callback), - ) - if err != nil { - log.Errorf("error during gauge creation: %v", err) - return nil, err - } - gInfo := gaugeInfo{ - info: mInfo, - obs: obs, - gauge: &gauge, - } - gauges = append(gauges, gInfo) - // TBD: handle histograms - case "default": - log.Errorf("invalid metric type = %v, skipping", mCfg.Type) - continue - } - } - - w := &EncodeOtlpMetrics{ - cfg: cfg, - ctx: ctx, - res: res, - mp: mp, - meter: meterFactory, - counters: counters, - gauges: gauges, - expiryTime: expiryTime.Duration, - mCache: putils.NewTimedCache(0, nil), - exitChan: putils.ExitChannel(), - metricsProcessed: opMetrics.NewCounter(&encode.MetricsProcessed, params.Name), - metricsDropped: opMetrics.NewCounter(&encode.MetricsDropped, params.Name), - errorsCounter: opMetrics.NewCounterVec(&encode.EncodePromErrors), - } - go w.cleanupExpiredEntriesLoop() - return w, nil -} - -// Cleanup - callback function from lru cleanup -func (e *EncodeOtlpMetrics) Cleanup(cleanupFunc interface{}) { - // nothing more to do -} - -func (e *EncodeOtlpMetrics) cleanupExpiredEntriesLoop() { - ticker := time.NewTicker(e.expiryTime) - for { - select { - case <-e.exitChan: - log.Debugf("exiting cleanupExpiredEntriesLoop because of signal") - return - case <-ticker.C: - e.mCache.CleanupExpiredEntries(e.expiryTime, e.Cleanup) - } - } -} - -// At present, golang only supports asynchronous gauge, so we have some function here to support this - -type Float64GaugeEntry struct { - attributes []attribute.KeyValue - value float64 -} - -type Float64Gauge struct { - observations map[string]Float64GaugeEntry -} - -// Callback implements the callback function for the underlying asynchronous gauge -// it observes the current state of all previous Set() calls. -func (f *Float64Gauge) Callback(ctx context.Context, o metric.Float64Observer) error { - for _, fEntry := range f.observations { - o.Observe(fEntry.value, metric.WithAttributes(fEntry.attributes...)) - } - // re-initialize the observed items - f.observations = make(map[string]Float64GaugeEntry) - return nil -} - -func (f *Float64Gauge) Set(key string, val float64, attrs []attribute.KeyValue) { - f.observations[key] = Float64GaugeEntry{ - value: val, - attributes: attrs, - } -} diff --git a/pkg/pipeline/encode/prom_cache_test.go b/pkg/pipeline/encode/prom_cache_test.go index bab6d71e2..649b310e4 100644 --- a/pkg/pipeline/encode/prom_cache_test.go +++ b/pkg/pipeline/encode/prom_cache_test.go @@ -113,12 +113,12 @@ func Test_Prom_Cache1(t *testing.T) { entries = utils.GenerateConnectionFlowEntries(10) require.Equal(t, 10, len(entries)) encodeEntries(promEncode, entries) - require.Equal(t, 10, promEncode.mCache.GetCacheLen()) + require.Equal(t, 10, promEncode.metricCommon.mCache.GetCacheLen()) entries = utils.GenerateConnectionFlowEntries(40) require.Equal(t, 40, len(entries)) encodeEntries(promEncode, entries) - require.Equal(t, 30, promEncode.mCache.GetCacheLen()) + require.Equal(t, 30, promEncode.metricCommon.mCache.GetCacheLen()) } func Test_Prom_Cache2(t *testing.T) { @@ -133,12 +133,12 @@ func Test_Prom_Cache2(t *testing.T) { entries = utils.GenerateConnectionFlowEntries(10) require.Equal(t, 10, len(entries)) encodeEntries(promEncode, entries) - require.Equal(t, 20, promEncode.mCache.GetCacheLen()) + require.Equal(t, 20, promEncode.metricCommon.mCache.GetCacheLen()) entries = utils.GenerateConnectionFlowEntries(40) require.Equal(t, 40, len(entries)) encodeEntries(promEncode, entries) - require.Equal(t, 30, promEncode.mCache.GetCacheLen()) + require.Equal(t, 30, promEncode.metricCommon.mCache.GetCacheLen()) } func Test_Prom_Cache3(t *testing.T) { @@ -153,10 +153,10 @@ func Test_Prom_Cache3(t *testing.T) { entries = utils.GenerateConnectionFlowEntries(10) require.Equal(t, 10, len(entries)) encodeEntries(promEncode, entries) - require.Equal(t, 20, promEncode.mCache.GetCacheLen()) + require.Equal(t, 20, promEncode.metricCommon.mCache.GetCacheLen()) entries = utils.GenerateConnectionFlowEntries(40) require.Equal(t, 40, len(entries)) encodeEntries(promEncode, entries) - require.Equal(t, 80, promEncode.mCache.GetCacheLen()) + require.Equal(t, 80, promEncode.metricCommon.mCache.GetCacheLen()) } diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index 769a2a396..b41e0ff41 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -10,7 +10,6 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode/opentelemetry" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/conntrack" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" @@ -431,11 +430,11 @@ func getEncoder(opMetrics *operational.Metrics, params config.StageParam) (encod case api.S3Type: encoder, err = encode.NewEncodeS3(opMetrics, params) case api.OtlpLogsType: - encoder, err = opentelemetry.NewEncodeOtlpLogs(opMetrics, params) + encoder, err = encode.NewEncodeOtlpLogs(opMetrics, params) case api.OtlpMetricsType: - encoder, err = opentelemetry.NewEncodeOtlpMetrics(opMetrics, params) + encoder, err = encode.NewEncodeOtlpMetrics(opMetrics, params) case api.OtlpTracesType: - encoder, err = opentelemetry.NewEncodeOtlpTraces(opMetrics, params) + encoder, err = encode.NewEncodeOtlpTraces(opMetrics, params) case api.NoneType: encoder, _ = encode.NewEncodeNone() default: From d723857f3644f633ea7747e292e547642c87f25e Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Tue, 6 Feb 2024 14:37:49 +0200 Subject: [PATCH 2/5] handle otlp histograms --- pkg/pipeline/encode/encode_otlpmetrics.go | 31 ++++++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/pkg/pipeline/encode/encode_otlpmetrics.go b/pkg/pipeline/encode/encode_otlpmetrics.go index cffe8f3aa..839d9a0d7 100644 --- a/pkg/pipeline/encode/encode_otlpmetrics.go +++ b/pkg/pipeline/encode/encode_otlpmetrics.go @@ -19,7 +19,6 @@ package encode import ( "context" - "fmt" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" @@ -66,11 +65,21 @@ func (e *EncodeOtlpMetrics) ProcessGauge(m interface{}, labels map[string]string } func (e *EncodeOtlpMetrics) ProcessHist(m interface{}, labels map[string]string, value float64) error { - return fmt.Errorf("histogram not yet implemented") + histo := m.(metric.Float64Histogram) + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + histo.Record(e.ctx, value, metric.WithAttributes(attributes...)) + return nil } func (e *EncodeOtlpMetrics) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error { - return fmt.Errorf("agg histogram not yet implemented") + histo := m.(metric.Float64Histogram) + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + for _, v := range values { + histo.Record(e.ctx, v, metric.WithAttributes(attributes...)) + } + return nil } func (e *EncodeOtlpMetrics) GetChacheEntry(entryLabels map[string]string, m interface{}) interface{} { @@ -140,7 +149,21 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar return nil, err } metricCommon.AddGauge(gauge, mInfo) - // TBD: handle histograms + case api.MetricEncodeOperationName("Histogram"): + var histo metric.Float64Histogram + if len(mCfg.Buckets) == 0 { + histo, err = meter.Float64Histogram(fullMetricName) + } else { + histo, err = meter.Float64Histogram(fullMetricName, + metric.WithExplicitBucketBoundaries(mCfg.Buckets...), + ) + + } + if err != nil { + log.Errorf("error during counter creation: %v", err) + return nil, err + } + metricCommon.AddHist(histo, mInfo) case "default": log.Errorf("invalid metric type = %v, skipping", mCfg.Type) continue From 8d5d90758a5f2db1c65dd2062895ea1ca88959ca Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Mon, 12 Feb 2024 04:35:56 +0200 Subject: [PATCH 3/5] moved otlp files to separate directory --- .../encode/{ => opentelemetry}/encode_otlp_test.go | 11 ++++------- .../encode/{ => opentelemetry}/encode_otlplogs.go | 5 +++-- .../{ => opentelemetry}/encode_otlpmetrics.go | 13 ++++++++----- .../encode/{ => opentelemetry}/encode_otlptrace.go | 5 +++-- .../encode/{ => opentelemetry}/opentelemetry.go | 2 +- pkg/pipeline/pipeline_builder.go | 7 ++++--- 6 files changed, 23 insertions(+), 20 deletions(-) rename pkg/pipeline/encode/{ => opentelemetry}/encode_otlp_test.go (94%) rename pkg/pipeline/encode/{ => opentelemetry}/encode_otlplogs.go (93%) rename pkg/pipeline/encode/{ => opentelemetry}/encode_otlpmetrics.go (94%) rename pkg/pipeline/encode/{ => opentelemetry}/encode_otlptrace.go (96%) rename pkg/pipeline/encode/{ => opentelemetry}/opentelemetry.go (99%) diff --git a/pkg/pipeline/encode/encode_otlp_test.go b/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go similarity index 94% rename from pkg/pipeline/encode/encode_otlp_test.go rename to pkg/pipeline/encode/opentelemetry/encode_otlp_test.go index 5e2206de0..1864f3287 100644 --- a/pkg/pipeline/encode/encode_otlp_test.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go @@ -15,7 +15,7 @@ * */ -package encode +package opentelemetry import ( "encoding/json" @@ -26,6 +26,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/require" ) @@ -59,7 +60,7 @@ func (f *fakeOltpLoggerProvider) Logger(name string, options ...logs.LoggerOptio return &fakeOltpLogger{} } -func initNewEncodeOtlpLogs(t *testing.T) Encoder { +func initNewEncodeOtlpLogs(t *testing.T) encode.Encoder { otlpReceivedData = []logs.LogRecord{} v, cfg := test.InitConfig(t, testOtlpConfig) require.NotNil(t, v) @@ -192,9 +193,5 @@ func Test_EncodeOtlpMetrics(t *testing.T) { newEncode, err := NewEncodeOtlpMetrics(operational.NewMetrics(&config.MetricsSettings{}), cfg) require.NoError(t, err) require.NotNil(t, newEncode) - em := newEncode.(*EncodeOtlpMetrics) - require.Equal(t, 2, len(em.metricCommon.counters)) - require.Equal(t, 1, len(em.metricCommon.gauges)) - require.Equal(t, "metric3", em.metricCommon.counters[1].info.Name) - require.Equal(t, []string{"label21", "label22"}, em.metricCommon.gauges[0].info.Labels) + // TODO: add more tests } diff --git a/pkg/pipeline/encode/encode_otlplogs.go b/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go similarity index 93% rename from pkg/pipeline/encode/encode_otlplogs.go rename to pkg/pipeline/encode/opentelemetry/encode_otlplogs.go index 041b51b03..d67a073c8 100644 --- a/pkg/pipeline/encode/encode_otlplogs.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go @@ -15,7 +15,7 @@ * */ -package encode +package opentelemetry import ( "context" @@ -24,6 +24,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/sdk/resource" ) @@ -41,7 +42,7 @@ func (e *EncodeOtlpLogs) Encode(entry config.GenericMap) { e.LogWrite(entry) } -func NewEncodeOtlpLogs(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { +func NewEncodeOtlpLogs(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) { log.Tracef("entering NewEncodeOtlpLogs \n") cfg := api.EncodeOtlpLogs{} if params.Encode != nil && params.Encode.OtlpLogs != nil { diff --git a/pkg/pipeline/encode/encode_otlpmetrics.go b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go similarity index 94% rename from pkg/pipeline/encode/encode_otlpmetrics.go rename to pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go index 839d9a0d7..69a5444e7 100644 --- a/pkg/pipeline/encode/encode_otlpmetrics.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go @@ -15,14 +15,16 @@ * */ -package encode +package opentelemetry import ( "context" + "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -31,6 +33,7 @@ import ( "go.opentelemetry.io/otel/sdk/resource" ) +const defaultExpiryTime = time.Duration(2 * time.Minute) const flpMeterName = "flp_meter" type EncodeOtlpMetrics struct { @@ -39,7 +42,7 @@ type EncodeOtlpMetrics struct { res *resource.Resource mp *sdkmetric.MeterProvider meter metric.Meter - metricCommon *MetricsCommonStruct + metricCommon *encode.MetricsCommonStruct } // Encode encodes a metric to be exported @@ -86,7 +89,7 @@ func (e *EncodeOtlpMetrics) GetChacheEntry(entryLabels map[string]string, m inte return entryLabels } -func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { +func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) { log.Tracef("entering NewEncodeOtlpMetrics \n") cfg := api.EncodeOtlpMetrics{} if params.Encode != nil && params.Encode.OtlpMetrics != nil { @@ -120,7 +123,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar meter: meterFactory, } - metricCommon := NewMetricsCommonStruct(opMetrics, 0, params.Name, expiryTime, nil) + metricCommon := encode.NewMetricsCommonStruct(opMetrics, 0, params.Name, expiryTime, nil) w.metricCommon = metricCommon for _, mCfg := range cfg.Metrics { @@ -128,7 +131,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar labels := mCfg.Labels log.Debugf("fullMetricName = %v", fullMetricName) log.Debugf("Labels = %v", labels) - mInfo := CreateMetricInfo(mCfg) + mInfo := encode.CreateMetricInfo(mCfg) switch mCfg.Type { case api.MetricEncodeOperationName("Counter"): counter, err := meter.Float64Counter(fullMetricName) diff --git a/pkg/pipeline/encode/encode_otlptrace.go b/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go similarity index 96% rename from pkg/pipeline/encode/encode_otlptrace.go rename to pkg/pipeline/encode/opentelemetry/encode_otlptrace.go index 2ba7d0269..e1fe98791 100644 --- a/pkg/pipeline/encode/encode_otlptrace.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go @@ -15,7 +15,7 @@ * */ -package encode +package opentelemetry import ( "context" @@ -24,6 +24,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -90,7 +91,7 @@ OUTER: } } -func NewEncodeOtlpTraces(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { +func NewEncodeOtlpTraces(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) { log.Tracef("entering NewEncodeOtlpTraces \n") cfg := api.EncodeOtlpTraces{} if params.Encode != nil && params.Encode.OtlpTraces != nil { diff --git a/pkg/pipeline/encode/opentelemetry.go b/pkg/pipeline/encode/opentelemetry/opentelemetry.go similarity index 99% rename from pkg/pipeline/encode/opentelemetry.go rename to pkg/pipeline/encode/opentelemetry/opentelemetry.go index 7dc9c1264..dae256f3a 100644 --- a/pkg/pipeline/encode/opentelemetry.go +++ b/pkg/pipeline/encode/opentelemetry/opentelemetry.go @@ -15,7 +15,7 @@ * */ -package encode +package opentelemetry import ( "context" diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index b41e0ff41..769a2a396 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -10,6 +10,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode/opentelemetry" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/conntrack" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" @@ -430,11 +431,11 @@ func getEncoder(opMetrics *operational.Metrics, params config.StageParam) (encod case api.S3Type: encoder, err = encode.NewEncodeS3(opMetrics, params) case api.OtlpLogsType: - encoder, err = encode.NewEncodeOtlpLogs(opMetrics, params) + encoder, err = opentelemetry.NewEncodeOtlpLogs(opMetrics, params) case api.OtlpMetricsType: - encoder, err = encode.NewEncodeOtlpMetrics(opMetrics, params) + encoder, err = opentelemetry.NewEncodeOtlpMetrics(opMetrics, params) case api.OtlpTracesType: - encoder, err = encode.NewEncodeOtlpTraces(opMetrics, params) + encoder, err = opentelemetry.NewEncodeOtlpTraces(opMetrics, params) case api.NoneType: encoder, _ = encode.NewEncodeNone() default: From d3ca238aa6aa814382c72dcfdf78bcdaf4197142 Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Mon, 12 Feb 2024 04:57:37 +0200 Subject: [PATCH 4/5] added metrics_common file --- pkg/pipeline/encode/metrics_common.go | 279 ++++++++++++++++++++++++++ 1 file changed, 279 insertions(+) create mode 100644 pkg/pipeline/encode/metrics_common.go diff --git a/pkg/pipeline/encode/metrics_common.go b/pkg/pipeline/encode/metrics_common.go new file mode 100644 index 000000000..c90a5a2e9 --- /dev/null +++ b/pkg/pipeline/encode/metrics_common.go @@ -0,0 +1,279 @@ +/* + * Copyright (C) 2024 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package encode + +import ( + "fmt" + "strings" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +type mInfoStruct struct { + genericMetric interface{} // can be a counter, gauge, or histogram pointer + info *MetricInfo +} + +type MetricsCommonStruct struct { + gauges []mInfoStruct + counters []mInfoStruct + histos []mInfoStruct + aggHistos []mInfoStruct + mCache *putils.TimedCache + mChacheLenMetric prometheus.Gauge + metricsProcessed prometheus.Counter + metricsDropped prometheus.Counter + errorsCounter *prometheus.CounterVec + expiryTime time.Duration + exitChan <-chan struct{} +} + +type MetricsCommonInterface interface { + GetChacheEntry(entryLabels map[string]string, m interface{}) interface{} + ProcessCounter(m interface{}, labels map[string]string, value float64) error + ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error + ProcessHist(m interface{}, labels map[string]string, value float64) error + ProcessAggHist(m interface{}, labels map[string]string, value []float64) error +} + +var ( + metricsProcessed = operational.DefineMetric( + "metrics_processed", + "Number of metrics processed", + operational.TypeCounter, + "stage", + ) + metricsDropped = operational.DefineMetric( + "metrics_dropped", + "Number of metrics dropped", + operational.TypeCounter, + "stage", + ) + encodePromErrors = operational.DefineMetric( + "encode_prom_errors", + "Total errors during metrics generation", + operational.TypeCounter, + "error", "metric", "key", + ) + mChacheLen = operational.DefineMetric( + "encode_prom_metrics_reported", + "Total number of prometheus metrics reported by this stage", + operational.TypeGauge, + "stage", + ) +) + +func (m *MetricsCommonStruct) AddCounter(g interface{}, info *MetricInfo) { + mStruct := mInfoStruct{genericMetric: g, info: info} + m.counters = append(m.counters, mStruct) +} + +func (m *MetricsCommonStruct) AddGauge(g interface{}, info *MetricInfo) { + mStruct := mInfoStruct{genericMetric: g, info: info} + m.gauges = append(m.gauges, mStruct) +} + +func (m *MetricsCommonStruct) AddHist(g interface{}, info *MetricInfo) { + mStruct := mInfoStruct{genericMetric: g, info: info} + m.histos = append(m.histos, mStruct) +} + +func (m *MetricsCommonStruct) AddAggHist(g interface{}, info *MetricInfo) { + mStruct := mInfoStruct{genericMetric: g, info: info} + m.aggHistos = append(m.aggHistos, mStruct) +} + +func (e *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, metricRecord config.GenericMap) { + log.Tracef("entering MetricCommonEncode. metricRecord = %v", metricRecord) + + // Process counters + for _, mInfo := range e.counters { + labels, value, _ := e.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric) + if labels == nil { + continue + } + err := mci.ProcessCounter(mInfo.genericMetric, labels, value) + if err != nil { + log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) + e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + continue + } + e.metricsProcessed.Inc() + } + + // Process gauges + for _, mInfo := range e.gauges { + labels, value, key := e.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric) + if labels == nil { + continue + } + err := mci.ProcessGauge(mInfo.genericMetric, labels, value, key) + if err != nil { + log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) + e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + continue + } + e.metricsProcessed.Inc() + } + + // Process histograms + for _, mInfo := range e.histos { + labels, value, _ := e.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric) + if labels == nil { + continue + } + err := mci.ProcessHist(mInfo.genericMetric, labels, value) + if err != nil { + log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) + e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + continue + } + e.metricsProcessed.Inc() + } + + // Process pre-aggregated histograms + for _, mInfo := range e.aggHistos { + labels, values := e.prepareAggHisto(mci, metricRecord, mInfo.info, mInfo.genericMetric) + if labels == nil { + continue + } + err := mci.ProcessAggHist(mInfo.genericMetric, labels, values) + if err != nil { + log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) + e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + continue + } + e.metricsProcessed.Inc() + } +} + +func (m *MetricsCommonStruct) prepareMetric(mci MetricsCommonInterface, flow config.GenericMap, info *MetricInfo, mv interface{}) (map[string]string, float64, string) { + val := m.extractGenericValue(flow, info) + if val == nil { + return nil, 0, "" + } + floatVal, err := utils.ConvertToFloat64(val) + if err != nil { + m.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc() + return nil, 0, "" + } + if info.ValueScale != 0 { + floatVal = floatVal / info.ValueScale + } + + entryLabels, key := extractLabelsAndKey(flow, &info.MetricsItem) + // Update entry for expiry mechanism (the entry itself is its own cleanup function) + cacheEntry := mci.GetChacheEntry(entryLabels, mv) + _, ok := m.mCache.UpdateCacheEntry(key, cacheEntry) + if !ok { + m.metricsDropped.Inc() + return nil, 0, "" + } + return entryLabels, floatVal, key +} + +func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow config.GenericMap, info *MetricInfo, mc interface{}) (map[string]string, []float64) { + val := m.extractGenericValue(flow, info) + if val == nil { + return nil, nil + } + values, ok := val.([]float64) + if !ok { + m.errorsCounter.WithLabelValues("HistoValueConversionError", info.Name, info.ValueKey).Inc() + return nil, nil + } + + entryLabels, key := extractLabelsAndKey(flow, &info.MetricsItem) + // Update entry for expiry mechanism (the entry itself is its own cleanup function) + cacheEntry := mci.GetChacheEntry(entryLabels, mc) + _, ok = m.mCache.UpdateCacheEntry(key, cacheEntry) + if !ok { + m.metricsDropped.Inc() + return nil, nil + } + return entryLabels, values +} + +func (m *MetricsCommonStruct) extractGenericValue(flow config.GenericMap, info *MetricInfo) interface{} { + for _, pred := range info.FilterPredicates { + if !pred(flow) { + return nil + } + } + if info.ValueKey == "" { + // No value key means it's a records / flows counter (1 flow = 1 increment), so just return 1 + return 1 + } + val, found := flow[info.ValueKey] + if !found { + m.errorsCounter.WithLabelValues("RecordKeyMissing", info.Name, info.ValueKey).Inc() + return nil + } + return val +} + +func extractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[string]string, string) { + entryLabels := make(map[string]string, len(info.Labels)) + key := strings.Builder{} + key.WriteString(info.Name) + key.WriteRune('|') + for _, t := range info.Labels { + entryLabels[t] = "" + if v, ok := flow[t]; ok { + entryLabels[t] = fmt.Sprintf("%v", v) + } + key.WriteString(entryLabels[t]) + key.WriteRune('|') + } + return entryLabels, key.String() +} + +func (m *MetricsCommonStruct) cleanupExpiredEntriesLoop(callback putils.CacheCallback) { + ticker := time.NewTicker(m.expiryTime) + for { + select { + case <-m.exitChan: + log.Debugf("exiting cleanupExpiredEntriesLoop because of signal") + return + case <-ticker.C: + m.mCache.CleanupExpiredEntries(m.expiryTime, callback) + } + } +} + +func NewMetricsCommonStruct(opMetrics *operational.Metrics, maxCacheEntries int, name string, expiryTime api.Duration, callback putils.CacheCallback) *MetricsCommonStruct { + mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, name) + m := &MetricsCommonStruct{ + mCache: putils.NewTimedCache(maxCacheEntries, mChacheLenMetric), + mChacheLenMetric: mChacheLenMetric, + metricsProcessed: opMetrics.NewCounter(&metricsProcessed, name), + metricsDropped: opMetrics.NewCounter(&metricsDropped, name), + errorsCounter: opMetrics.NewCounterVec(&encodePromErrors), + expiryTime: expiryTime.Duration, + exitChan: putils.ExitChannel(), + } + go m.cleanupExpiredEntriesLoop(callback) + return m +} From 9b8e42d37d150e2c8e914d8cba9fbdd5aeb46623 Mon Sep 17 00:00:00 2001 From: KalmanMeth Date: Mon, 26 Feb 2024 14:20:29 +0200 Subject: [PATCH 5/5] Update pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go Co-authored-by: Julien Pinsonneau <91894519+jpinsonneau@users.noreply.github.com> --- pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go index 69a5444e7..c645bd53d 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go @@ -163,7 +163,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar } if err != nil { - log.Errorf("error during counter creation: %v", err) + log.Errorf("error during histogram creation: %v", err) return nil, err } metricCommon.AddHist(histo, mInfo)