Skip to content

Commit

Permalink
Added config watcher and supporting metrics reloading (#607)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierCazade authored Apr 26, 2024
1 parent c9425a9 commit 2ab1f8d
Show file tree
Hide file tree
Showing 17 changed files with 487 additions and 73 deletions.
1 change: 1 addition & 0 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func initFlags() {
rootCmd.PersistentFlags().IntVar(&opts.Profile.Port, "profile.port", 0, "Go pprof tool port (default: disabled)")
rootCmd.PersistentFlags().StringVar(&opts.PipeLine, "pipeline", "", "json of config file pipeline field")
rootCmd.PersistentFlags().StringVar(&opts.Parameters, "parameters", "", "json of config file parameters field")
rootCmd.PersistentFlags().StringVar(&opts.DynamicParameters, "dynamicParameters", "", "json of configmap location for dynamic parameters")
rootCmd.PersistentFlags().StringVar(&opts.MetricsSettings, "metricsSettings", "", "json for global metrics settings")
}

Expand Down
42 changes: 32 additions & 10 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,35 @@ import (
)

type Options struct {
PipeLine string
Parameters string
MetricsSettings string
Health Health
Profile Profile
PipeLine string
Parameters string
DynamicParameters string
MetricsSettings string
Health Health
Profile Profile
}

// (nolint => needs refactoring)
//
//nolint:revive
type ConfigFileStruct struct {
LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"`
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"`
LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"`
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"`
DynamicParameters DynamicParameters `yaml:"dynamicParameters,omitempty" json:"dynamicParameters,omitempty"`
}

type DynamicParameters struct {
Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty"`
Name string `yaml:"name,omitempty" json:"name,omitempty"`
FileName string `yaml:"fileName,omitempty" json:"fileName,omitempty"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
}

type HotReloadStruct struct {
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
}

type Health struct {
Expand Down Expand Up @@ -154,6 +167,15 @@ func ParseConfig(opts *Options) (ConfigFileStruct, error) {
}
logrus.Debugf("params = %v ", out.Parameters)

if opts.DynamicParameters != "" {
err = JSONUnmarshalStrict([]byte(opts.DynamicParameters), &out.DynamicParameters)
if err != nil {
logrus.Errorf("error when parsing dynamic pipeline parameters: %v", err)
return out, err
}
logrus.Debugf("dynamicParams = %v ", out.DynamicParameters)
}

if opts.MetricsSettings != "" {
err = JSONUnmarshalStrict([]byte(opts.MetricsSettings), &out.MetricsSettings)
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions pkg/config/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,33 @@ func (b *PipelineBuilderStage) GetStageParams() []StageParam {
return b.pipeline.config
}

func isStaticParam(param StageParam) bool {
if param.Encode != nil && param.Encode.Type == api.PromType {
return false
}
return true
}

func (b *PipelineBuilderStage) GetStaticStageParams() []StageParam {
res := []StageParam{}
for _, param := range b.pipeline.config {
if isStaticParam(param) {
res = append(res, param)
}
}
return res
}

func (b *PipelineBuilderStage) GetDynamicStageParams() []StageParam {
res := []StageParam{}
for _, param := range b.pipeline.config {
if !isStaticParam(param) {
res = append(res, param)
}
}
return res
}

// IntoConfigFileStruct injects the current pipeline and params in the provided ConfigFileStruct object.
func (b *PipelineBuilderStage) IntoConfigFileStruct(cfs *ConfigFileStruct) *ConfigFileStruct {
cfs.Pipeline = b.GetStages()
Expand Down
5 changes: 5 additions & 0 deletions pkg/pipeline/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ type encodeNone struct {

type Encoder interface {
Encode(in config.GenericMap)
Update(config.StageParam)
}

// Encode encodes a flow before being stored
func (t *encodeNone) Encode(in config.GenericMap) {
t.prevRecord = in
}

func (t *encodeNone) Update(_ config.StageParam) {
log.Warn("Encode None, update not supported")
}

// NewEncodeNone create a new encode
func NewEncodeNone() (Encoder, error) {
log.Debugf("entering NewEncodeNone")
Expand Down
4 changes: 4 additions & 0 deletions pkg/pipeline/encode/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (r *encodeKafka) Encode(entry config.GenericMap) {
}
}

func (r *encodeKafka) Update(_ config.StageParam) {
log.Warn("Encode Kafka, update not supported")
}

// NewEncodeKafka create a new writer to kafka
func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) {
log.Debugf("entering NewEncodeKafka")
Expand Down
155 changes: 151 additions & 4 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package encode

import (
"reflect"
"strings"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand All @@ -35,12 +37,14 @@ type EncodeProm struct {
cfg *api.PromEncode
registerer prometheus.Registerer
metricCommon *MetricsCommonStruct
updateChan chan config.StageParam
}

// Encode encodes a metric before being stored; the heavy work is done by the MetricCommonEncode
func (e *EncodeProm) Encode(metricRecord config.GenericMap) {
log.Tracef("entering EncodeMetric. metricRecord = %v", metricRecord)
e.metricCommon.MetricCommonEncode(e, metricRecord)
e.checkConfUpdate()
}

func (e *EncodeProm) ProcessCounter(m interface{}, labels map[string]string, value float64) error {
Expand Down Expand Up @@ -102,6 +106,144 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) {
cleanupFunc.(func())()
}

func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(counter)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddCounter(fullMetricName, counter, mInfo)
}

func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) {
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddGauge(fullMetricName, gauge, mInfo)
}
func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) {
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(histogram)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddHist(fullMetricName, histogram, mInfo)
}
func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) {
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(agghistogram)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo)
}

func (e *EncodeProm) unregisterMetric(c interface{}) {
if c, ok := c.(prometheus.Collector); ok {
e.registerer.Unregister(c)
}

}

func (e *EncodeProm) cleanDeletedGeneric(newCfg api.PromEncode, metrics map[string]mInfoStruct) {
for fullName, m := range metrics {
if !strings.HasPrefix(fullName, newCfg.Prefix) {
if c, ok := m.genericMetric.(prometheus.Collector); ok {
e.registerer.Unregister(c)
}
e.unregisterMetric(m.genericMetric)
delete(metrics, fullName)
}
metricName := strings.TrimPrefix(fullName, newCfg.Prefix)
found := false
for i := range newCfg.Metrics {
if metricName == newCfg.Metrics[i].Name {
found = true
break
}
}
if !found {
e.unregisterMetric(m.genericMetric)
delete(metrics, fullName)
}
}
}

func (e *EncodeProm) cleanDeletedMetrics(newCfg api.PromEncode) {
e.cleanDeletedGeneric(newCfg, e.metricCommon.counters)
e.cleanDeletedGeneric(newCfg, e.metricCommon.gauges)
e.cleanDeletedGeneric(newCfg, e.metricCommon.histos)
e.cleanDeletedGeneric(newCfg, e.metricCommon.aggHistos)
}

func (e *EncodeProm) checkConfUpdate() {
select {
case stage := <-e.updateChan:
cfg := api.PromEncode{}
if stage.Encode != nil && stage.Encode.Prom != nil {
cfg = *stage.Encode.Prom
}

e.cleanDeletedMetrics(cfg)

for i := range cfg.Metrics {
fullMetricName := cfg.Prefix + cfg.Metrics[i].Name
mInfo := CreateMetricInfo(&cfg.Metrics[i])
switch cfg.Metrics[i].Type {
case api.MetricCounter:
if oldMetric, ok := e.metricCommon.counters[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addCounter(fullMetricName, mInfo)
}
} else {
// New metric
e.addCounter(fullMetricName, mInfo)
}
case api.MetricGauge:
if oldMetric, ok := e.metricCommon.gauges[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addGauge(fullMetricName, mInfo)
}
} else {
// New metric
e.addGauge(fullMetricName, mInfo)
}
case api.MetricHistogram:
if oldMetric, ok := e.metricCommon.histos[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addHistogram(fullMetricName, mInfo)
}
} else {
// New metric
e.addHistogram(fullMetricName, mInfo)
}
case api.MetricAggHistogram:
if oldMetric, ok := e.metricCommon.aggHistos[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addAgghistogram(fullMetricName, mInfo)
}
} else {
// New metric
e.addAgghistogram(fullMetricName, mInfo)
}
case "default":
log.Errorf("invalid metric type = %v, skipping", cfg.Metrics[i].Type)
continue
}

}
default:
//Nothing to do
return
}
}

func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) {
cfg := api.PromEncode{}
if params.Encode != nil && params.Encode.Prom != nil {
Expand All @@ -126,6 +268,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
w := &EncodeProm{
cfg: params.Encode.Prom,
registerer: registerer,
updateChan: make(chan config.StageParam),
}

metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup)
Expand All @@ -146,15 +289,15 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddCounter(counter, mInfo)
metricCommon.AddCounter(fullMetricName, counter, mInfo)
case api.MetricGauge:
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels)
err := registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddGauge(gauge, mInfo)
metricCommon.AddGauge(fullMetricName, gauge, mInfo)
case api.MetricHistogram:
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
Expand All @@ -163,7 +306,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddHist(hist, mInfo)
metricCommon.AddHist(fullMetricName, hist, mInfo)
case api.MetricAggHistogram:
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
Expand All @@ -172,11 +315,15 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddAggHist(hist, mInfo)
metricCommon.AddAggHist(fullMetricName, hist, mInfo)
case "default":
log.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
}
}
return w, nil
}

func (e *EncodeProm) Update(config config.StageParam) {
e.updateChan <- config
}
8 changes: 4 additions & 4 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func Test_NewEncodeProm(t *testing.T) {
require.Equal(t, 1, len(encodeProm.metricCommon.aggHistos))
require.Equal(t, time.Second, encodeProm.metricCommon.expiryTime)

require.Equal(t, encodeProm.metricCommon.gauges[0].info.Name, "Bytes")
require.Equal(t, encodeProm.metricCommon.gauges["test_Bytes"].info.Name, "Bytes")
expectedList := []string{"srcAddr", "dstAddr", "srcPort"}
require.Equal(t, encodeProm.metricCommon.gauges[0].info.Labels, expectedList)
require.Equal(t, encodeProm.metricCommon.gauges["test_Bytes"].info.Labels, expectedList)

require.Equal(t, encodeProm.metricCommon.counters[0].info.Name, "Packets")
require.Equal(t, encodeProm.metricCommon.counters["test_Packets"].info.Name, "Packets")
expectedList = []string{"srcAddr", "dstAddr", "dstPort"}
require.Equal(t, encodeProm.metricCommon.counters[0].info.Labels, expectedList)
require.Equal(t, encodeProm.metricCommon.counters["test_Packets"].info.Labels, expectedList)
entry := test.GetExtractMockEntry()
encodeProm.Encode(entry)

Expand Down
8 changes: 8 additions & 0 deletions pkg/pipeline/encode/encode_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (s *encodeS3) GenerateStoreHeader(flows []config.GenericMap, startTime time
return object
}

func (s *encodeS3) Update(_ config.StageParam) {
log.Warn("Encode S3 Writer, update not supported")
}

func (s *encodeS3) createObjectTimeoutLoop() {
log.Debugf("entering createObjectTimeoutLoop")
ticker := time.NewTicker(s.s3Params.WriteTimeout.Duration)
Expand Down Expand Up @@ -215,3 +219,7 @@ func (e *encodeS3Writer) putObject(bucket string, objectName string, object map[
log.Debugf("uploadInfo = %v", uploadInfo)
return err
}

func (e *encodeS3Writer) Update(_ config.StageParam) {
log.Warn("Encode S3 Writer, update not supported")
}
Loading

0 comments on commit 2ab1f8d

Please sign in to comment.