Skip to content

Commit

Permalink
Added config watcher and supporting metrics reloading
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierCazade committed Mar 22, 2024
1 parent b1ccc26 commit 4750c83
Show file tree
Hide file tree
Showing 15 changed files with 466 additions and 68 deletions.
22 changes: 17 additions & 5 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,23 @@ type Options struct {
//
//nolint:revive
type ConfigFileStruct struct {
LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"`
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"`
LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"`
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"`
DynamicParameters DynamicParameters `yaml:"dynamicParameters,omitempty" json:"dynamicParameters,omitempty"`
}

type DynamicParameters struct {
Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty"`
Name string `yaml:"name,omitempty" json:"name,omitempty"`
FileName string `yaml:"fileName,omitempty" json:"fileName,omitempty"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
}

type ConfigHotReloadStruct struct {
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
}

type Health struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/pipeline/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ type encodeNone struct {

type Encoder interface {
Encode(in config.GenericMap)
Update(config.StageParam)
}

// Encode encodes a flow before being stored
func (t *encodeNone) Encode(in config.GenericMap) {
t.prevRecord = in
}

func (t *encodeNone) Update(_ config.StageParam) {
log.Warn("Encode None, update not supported")
}

// NewEncodeNone create a new encode
func NewEncodeNone() (Encoder, error) {
log.Debugf("entering NewEncodeNone")
Expand Down
4 changes: 4 additions & 0 deletions pkg/pipeline/encode/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (r *encodeKafka) Encode(entry config.GenericMap) {
}
}

func (t *encodeKafka) Update(_ config.StageParam) {
log.Warn("Encode Kafka, update not supported")
}

// NewEncodeKafka create a new writer to kafka
func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) {
log.Debugf("entering NewEncodeKafka")
Expand Down
226 changes: 222 additions & 4 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package encode

import (
"reflect"
"strings"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand All @@ -35,12 +37,14 @@ type EncodeProm struct {
cfg *api.PromEncode
registerer prometheus.Registerer
metricCommon *MetricsCommonStruct
updateChan chan config.StageParam
}

// Encode encodes a metric before being stored; the heavy work is done by the MetricCommonEncode
func (e *EncodeProm) Encode(metricRecord config.GenericMap) {
log.Tracef("entering EncodeMetric. metricRecord = %v", metricRecord)
e.metricCommon.MetricCommonEncode(e, metricRecord)
e.checkConfUpdate()
}

func (e *EncodeProm) ProcessCounter(m interface{}, labels map[string]string, value float64) error {
Expand Down Expand Up @@ -102,6 +106,215 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) {
cleanupFunc.(func())()
}

func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(counter)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddCounter(fullMetricName, counter, mInfo)
}

func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) {
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddGauge(fullMetricName, gauge, mInfo)
}
func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) {
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(histogram)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddHist(fullMetricName, histogram, mInfo)
}
func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) {
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(agghistogram)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo)
}

func (e *EncodeProm) unregisterMetric(c interface{}) {
if c, ok := c.(prometheus.Collector); ok {
e.registerer.Unregister(c)
}

}

func (e *EncodeProm) cleanDeletedCounters(newCfg api.PromEncode) {
for fullName, counter := range e.metricCommon.counters {
if !strings.HasPrefix(fullName, newCfg.Prefix) {
e.unregisterMetric(counter.genericMetric)
delete(e.metricCommon.counters, fullName)
}
metricName := strings.TrimPrefix(fullName, newCfg.Prefix)
found := false
for _, mCfg := range newCfg.Metrics {
if metricName == mCfg.Name {
found = true
break
}
}
if !found {
e.unregisterMetric(counter)
delete(e.metricCommon.counters, fullName)

}
}

}

func (e *EncodeProm) cleanDeletedGauges(newCfg api.PromEncode) {
for fullName, gauge := range e.metricCommon.gauges {
if !strings.HasPrefix(fullName, newCfg.Prefix) {
if c, ok := gauge.genericMetric.(prometheus.Collector); ok {
e.registerer.Unregister(c)
}
e.unregisterMetric(gauge.genericMetric)
delete(e.metricCommon.gauges, fullName)
}
metricName := strings.TrimPrefix(fullName, newCfg.Prefix)
found := false
for _, mCfg := range newCfg.Metrics {
if metricName == mCfg.Name {
found = true
break
}
}
if !found {
e.unregisterMetric(gauge.genericMetric)
delete(e.metricCommon.gauges, fullName)

}
}

}

func (e *EncodeProm) cleanDeletedHistograms(newCfg api.PromEncode) {
for fullName, histogram := range e.metricCommon.histos {
if !strings.HasPrefix(fullName, newCfg.Prefix) {
e.unregisterMetric(histogram.genericMetric)
delete(e.metricCommon.histos, fullName)
}
metricName := strings.TrimPrefix(fullName, newCfg.Prefix)
found := false
for _, mCfg := range newCfg.Metrics {
if metricName == mCfg.Name {
found = true
break
}
}
if !found {
e.unregisterMetric(histogram.genericMetric)
delete(e.metricCommon.histos, fullName)

}
}

}

func (e *EncodeProm) cleanDeletedAggHistograms(newCfg api.PromEncode) {
for fullName, aggHisto := range e.metricCommon.aggHistos {
if !strings.HasPrefix(fullName, newCfg.Prefix) {
e.unregisterMetric(aggHisto.genericMetric)
delete(e.metricCommon.aggHistos, fullName)
}
metricName := strings.TrimPrefix(fullName, newCfg.Prefix)
found := false
for _, mCfg := range newCfg.Metrics {
if metricName == mCfg.Name {
found = true
break
}
}
if !found {
e.unregisterMetric(aggHisto.genericMetric)
delete(e.metricCommon.aggHistos, fullName)

}
}

}

func (e *EncodeProm) cleanDeletedMetrics(newCfg api.PromEncode) {
e.cleanDeletedCounters(newCfg)
e.cleanDeletedGauges(newCfg)
e.cleanDeletedHistograms(newCfg)
e.cleanDeletedAggHistograms(newCfg)
}

func (e *EncodeProm) checkConfUpdate() {
select {
case stage := <-e.updateChan:
cfg := api.PromEncode{}
if stage.Encode != nil && stage.Encode.Prom != nil {
cfg = *stage.Encode.Prom
}

e.cleanDeletedMetrics(cfg)

for _, mCfg := range cfg.Metrics {
fullMetricName := cfg.Prefix + mCfg.Name
mInfo := CreateMetricInfo(&mCfg)
switch mCfg.Type {
case api.MetricEncodeOperationName("Counter"):
if oldMetric, ok := e.metricCommon.counters[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addCounter(fullMetricName, mInfo)
}
} else {
// New metric
e.addCounter(fullMetricName, mInfo)
}
case api.MetricEncodeOperationName("Gauge"):
if oldMetric, ok := e.metricCommon.gauges[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addGauge(fullMetricName, mInfo)
}
} else {
// New metric
e.addGauge(fullMetricName, mInfo)
}
case api.MetricEncodeOperationName("Histogram"):
if oldMetric, ok := e.metricCommon.histos[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addHistogram(fullMetricName, mInfo)
}
} else {
// New metric
e.addHistogram(fullMetricName, mInfo)
}
case api.MetricEncodeOperationName("AggHistogram"):
if oldMetric, ok := e.metricCommon.aggHistos[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addAgghistogram(fullMetricName, mInfo)
}
} else {
// New metric
e.addAgghistogram(fullMetricName, mInfo)
}
case "default":
log.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
}

}
default:
//Nothing to do
return
}
}

func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) {
cfg := api.PromEncode{}
if params.Encode != nil && params.Encode.Prom != nil {
Expand All @@ -126,6 +339,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
w := &EncodeProm{
cfg: params.Encode.Prom,
registerer: registerer,
updateChan: make(chan config.StageParam),
}

metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup)
Expand All @@ -146,15 +360,15 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddCounter(counter, mInfo)
metricCommon.AddCounter(fullMetricName, counter, mInfo)
case api.MetricEncodeOperationName("Gauge"):
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels)
err := registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddGauge(gauge, mInfo)
metricCommon.AddGauge(fullMetricName, gauge, mInfo)
case api.MetricEncodeOperationName("Histogram"):
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
Expand All @@ -163,7 +377,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddHist(hist, mInfo)
metricCommon.AddHist(fullMetricName, hist, mInfo)
case api.MetricEncodeOperationName("AggHistogram"):
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
Expand All @@ -172,11 +386,15 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddAggHist(hist, mInfo)
metricCommon.AddAggHist(fullMetricName, hist, mInfo)
case "default":
log.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
}
}
return w, nil
}

func (t *EncodeProm) Update(config config.StageParam) {
t.updateChan <- config
}
8 changes: 4 additions & 4 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func Test_NewEncodeProm(t *testing.T) {
require.Equal(t, 1, len(encodeProm.metricCommon.aggHistos))
require.Equal(t, time.Second, encodeProm.metricCommon.expiryTime)

require.Equal(t, encodeProm.metricCommon.gauges[0].info.Name, "Bytes")
require.Equal(t, encodeProm.metricCommon.gauges["test_Bytes"].info.Name, "Bytes")
expectedList := []string{"srcAddr", "dstAddr", "srcPort"}
require.Equal(t, encodeProm.metricCommon.gauges[0].info.Labels, expectedList)
require.Equal(t, encodeProm.metricCommon.gauges["test_Bytes"].info.Labels, expectedList)

require.Equal(t, encodeProm.metricCommon.counters[0].info.Name, "Packets")
require.Equal(t, encodeProm.metricCommon.counters["test_Packets"].info.Name, "Packets")
expectedList = []string{"srcAddr", "dstAddr", "dstPort"}
require.Equal(t, encodeProm.metricCommon.counters[0].info.Labels, expectedList)
require.Equal(t, encodeProm.metricCommon.counters["test_Packets"].info.Labels, expectedList)
entry := test.GetExtractMockEntry()
encodeProm.Encode(entry)

Expand Down
8 changes: 8 additions & 0 deletions pkg/pipeline/encode/encode_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (s *encodeS3) GenerateStoreHeader(flows []config.GenericMap, startTime time
return object
}

func (t *encodeS3) Update(_ config.StageParam) {
log.Warn("Encode S3 Writer, update not supported")
}

func (s *encodeS3) createObjectTimeoutLoop() {
log.Debugf("entering createObjectTimeoutLoop")
ticker := time.NewTicker(s.s3Params.WriteTimeout.Duration)
Expand Down Expand Up @@ -215,3 +219,7 @@ func (e *encodeS3Writer) putObject(bucket string, objectName string, object map[
log.Debugf("uploadInfo = %v", uploadInfo)
return err
}

func (t *encodeS3Writer) Update(_ config.StageParam) {
log.Warn("Encode S3 Writer, update not supported")
}
Loading

0 comments on commit 4750c83

Please sign in to comment.