Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1427: Added config watcher and supporting metrics reloading #607

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func initFlags() {
rootCmd.PersistentFlags().IntVar(&opts.Profile.Port, "profile.port", 0, "Go pprof tool port (default: disabled)")
rootCmd.PersistentFlags().StringVar(&opts.PipeLine, "pipeline", "", "json of config file pipeline field")
rootCmd.PersistentFlags().StringVar(&opts.Parameters, "parameters", "", "json of config file parameters field")
rootCmd.PersistentFlags().StringVar(&opts.DynamicParameters, "dynamicParameters", "", "json of configmap location for dynamic parameters")
rootCmd.PersistentFlags().StringVar(&opts.MetricsSettings, "metricsSettings", "", "json for global metrics settings")
}

Expand Down
42 changes: 32 additions & 10 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,35 @@ import (
)

type Options struct {
PipeLine string
Parameters string
MetricsSettings string
Health Health
Profile Profile
PipeLine string
Parameters string
DynamicParameters string
MetricsSettings string
Health Health
Profile Profile
}

// (nolint => needs refactoring)
//
//nolint:revive
type ConfigFileStruct struct {
LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"`
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"`
LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"`
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"`
DynamicParameters DynamicParameters `yaml:"dynamicParameters,omitempty" json:"dynamicParameters,omitempty"`
}

type DynamicParameters struct {
Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty"`
Name string `yaml:"name,omitempty" json:"name,omitempty"`
FileName string `yaml:"fileName,omitempty" json:"fileName,omitempty"`
KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"`
}

type HotReloadStruct struct {
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
}

type Health struct {
Expand Down Expand Up @@ -154,6 +167,15 @@ func ParseConfig(opts *Options) (ConfigFileStruct, error) {
}
logrus.Debugf("params = %v ", out.Parameters)

if opts.DynamicParameters != "" {
err = JSONUnmarshalStrict([]byte(opts.DynamicParameters), &out.DynamicParameters)
if err != nil {
logrus.Errorf("error when parsing dynamic pipeline parameters: %v", err)
return out, err
}
logrus.Debugf("dynamicParams = %v ", out.DynamicParameters)
}

if opts.MetricsSettings != "" {
err = JSONUnmarshalStrict([]byte(opts.MetricsSettings), &out.MetricsSettings)
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions pkg/config/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,33 @@ func (b *PipelineBuilderStage) GetStageParams() []StageParam {
return b.pipeline.config
}

func isStaticParam(param StageParam) bool {
if param.Encode != nil && param.Encode.Type == api.PromType {
return false
}
return true
}

func (b *PipelineBuilderStage) GetStaticStageParams() []StageParam {
res := []StageParam{}
for _, param := range b.pipeline.config {
if isStaticParam(param) {
res = append(res, param)
}
}
return res
}

func (b *PipelineBuilderStage) GetDynamicStageParams() []StageParam {
res := []StageParam{}
for _, param := range b.pipeline.config {
if !isStaticParam(param) {
res = append(res, param)
}
}
return res
}

// IntoConfigFileStruct injects the current pipeline and params in the provided ConfigFileStruct object.
func (b *PipelineBuilderStage) IntoConfigFileStruct(cfs *ConfigFileStruct) *ConfigFileStruct {
cfs.Pipeline = b.GetStages()
Expand Down
5 changes: 5 additions & 0 deletions pkg/pipeline/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ type encodeNone struct {

type Encoder interface {
Encode(in config.GenericMap)
Update(config.StageParam)
}

// Encode encodes a flow before being stored
func (t *encodeNone) Encode(in config.GenericMap) {
t.prevRecord = in
}

func (t *encodeNone) Update(_ config.StageParam) {
log.Warn("Encode None, update not supported")
}

// NewEncodeNone create a new encode
func NewEncodeNone() (Encoder, error) {
log.Debugf("entering NewEncodeNone")
Expand Down
4 changes: 4 additions & 0 deletions pkg/pipeline/encode/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (r *encodeKafka) Encode(entry config.GenericMap) {
}
}

func (r *encodeKafka) Update(_ config.StageParam) {
log.Warn("Encode Kafka, update not supported")
}

// NewEncodeKafka create a new writer to kafka
func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) {
log.Debugf("entering NewEncodeKafka")
Expand Down
155 changes: 151 additions & 4 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package encode

import (
"reflect"
"strings"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand All @@ -35,12 +37,14 @@ type EncodeProm struct {
cfg *api.PromEncode
registerer prometheus.Registerer
metricCommon *MetricsCommonStruct
updateChan chan config.StageParam
}

// Encode encodes a metric before being stored; the heavy work is done by the MetricCommonEncode
func (e *EncodeProm) Encode(metricRecord config.GenericMap) {
log.Tracef("entering EncodeMetric. metricRecord = %v", metricRecord)
e.metricCommon.MetricCommonEncode(e, metricRecord)
e.checkConfUpdate()
}

func (e *EncodeProm) ProcessCounter(m interface{}, labels map[string]string, value float64) error {
Expand Down Expand Up @@ -102,6 +106,144 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) {
cleanupFunc.(func())()
}

func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(counter)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddCounter(fullMetricName, counter, mInfo)
}

func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) {
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddGauge(fullMetricName, gauge, mInfo)
}
func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) {
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(histogram)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddHist(fullMetricName, histogram, mInfo)
}
func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) {
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels)
err := e.registerer.Register(agghistogram)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
}
e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo)
}

func (e *EncodeProm) unregisterMetric(c interface{}) {
if c, ok := c.(prometheus.Collector); ok {
e.registerer.Unregister(c)
}

}

func (e *EncodeProm) cleanDeletedGeneric(newCfg api.PromEncode, metrics map[string]mInfoStruct) {
for fullName, m := range metrics {
if !strings.HasPrefix(fullName, newCfg.Prefix) {
if c, ok := m.genericMetric.(prometheus.Collector); ok {
e.registerer.Unregister(c)
}
e.unregisterMetric(m.genericMetric)
delete(metrics, fullName)
}
metricName := strings.TrimPrefix(fullName, newCfg.Prefix)
found := false
for i := range newCfg.Metrics {
if metricName == newCfg.Metrics[i].Name {
found = true
break
}
}
if !found {
e.unregisterMetric(m.genericMetric)
delete(metrics, fullName)
}
}
}

func (e *EncodeProm) cleanDeletedMetrics(newCfg api.PromEncode) {
e.cleanDeletedGeneric(newCfg, e.metricCommon.counters)
e.cleanDeletedGeneric(newCfg, e.metricCommon.gauges)
e.cleanDeletedGeneric(newCfg, e.metricCommon.histos)
e.cleanDeletedGeneric(newCfg, e.metricCommon.aggHistos)
}

func (e *EncodeProm) checkConfUpdate() {
select {
case stage := <-e.updateChan:
cfg := api.PromEncode{}
if stage.Encode != nil && stage.Encode.Prom != nil {
cfg = *stage.Encode.Prom
}

e.cleanDeletedMetrics(cfg)

for i := range cfg.Metrics {
fullMetricName := cfg.Prefix + cfg.Metrics[i].Name
mInfo := CreateMetricInfo(&cfg.Metrics[i])
switch cfg.Metrics[i].Type {
case api.MetricCounter:
if oldMetric, ok := e.metricCommon.counters[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addCounter(fullMetricName, mInfo)
}
} else {
// New metric
e.addCounter(fullMetricName, mInfo)
}
case api.MetricGauge:
if oldMetric, ok := e.metricCommon.gauges[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addGauge(fullMetricName, mInfo)
}
} else {
// New metric
e.addGauge(fullMetricName, mInfo)
}
case api.MetricHistogram:
if oldMetric, ok := e.metricCommon.histos[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addHistogram(fullMetricName, mInfo)
}
} else {
// New metric
e.addHistogram(fullMetricName, mInfo)
}
case api.MetricAggHistogram:
if oldMetric, ok := e.metricCommon.aggHistos[fullMetricName]; ok {
if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) {
e.unregisterMetric(oldMetric.genericMetric)
e.addAgghistogram(fullMetricName, mInfo)
}
} else {
// New metric
e.addAgghistogram(fullMetricName, mInfo)
}
case "default":
log.Errorf("invalid metric type = %v, skipping", cfg.Metrics[i].Type)
continue
}

}
default:
//Nothing to do
return
}
}

func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) {
cfg := api.PromEncode{}
if params.Encode != nil && params.Encode.Prom != nil {
Expand All @@ -126,6 +268,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
w := &EncodeProm{
cfg: params.Encode.Prom,
registerer: registerer,
updateChan: make(chan config.StageParam),
}

metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup)
Expand All @@ -146,15 +289,15 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddCounter(counter, mInfo)
metricCommon.AddCounter(fullMetricName, counter, mInfo)
case api.MetricGauge:
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels)
err := registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddGauge(gauge, mInfo)
metricCommon.AddGauge(fullMetricName, gauge, mInfo)
case api.MetricHistogram:
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
Expand All @@ -163,7 +306,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddHist(hist, mInfo)
metricCommon.AddHist(fullMetricName, hist, mInfo)
case api.MetricAggHistogram:
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
Expand All @@ -172,11 +315,15 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
}
metricCommon.AddAggHist(hist, mInfo)
metricCommon.AddAggHist(fullMetricName, hist, mInfo)
case "default":
log.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
}
}
return w, nil
}

func (e *EncodeProm) Update(config config.StageParam) {
e.updateChan <- config
}
8 changes: 4 additions & 4 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func Test_NewEncodeProm(t *testing.T) {
require.Equal(t, 1, len(encodeProm.metricCommon.aggHistos))
require.Equal(t, time.Second, encodeProm.metricCommon.expiryTime)

require.Equal(t, encodeProm.metricCommon.gauges[0].info.Name, "Bytes")
require.Equal(t, encodeProm.metricCommon.gauges["test_Bytes"].info.Name, "Bytes")
expectedList := []string{"srcAddr", "dstAddr", "srcPort"}
require.Equal(t, encodeProm.metricCommon.gauges[0].info.Labels, expectedList)
require.Equal(t, encodeProm.metricCommon.gauges["test_Bytes"].info.Labels, expectedList)

require.Equal(t, encodeProm.metricCommon.counters[0].info.Name, "Packets")
require.Equal(t, encodeProm.metricCommon.counters["test_Packets"].info.Name, "Packets")
expectedList = []string{"srcAddr", "dstAddr", "dstPort"}
require.Equal(t, encodeProm.metricCommon.counters[0].info.Labels, expectedList)
require.Equal(t, encodeProm.metricCommon.counters["test_Packets"].info.Labels, expectedList)
entry := test.GetExtractMockEntry()
encodeProm.Encode(entry)

Expand Down
8 changes: 8 additions & 0 deletions pkg/pipeline/encode/encode_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (s *encodeS3) GenerateStoreHeader(flows []config.GenericMap, startTime time
return object
}

func (s *encodeS3) Update(_ config.StageParam) {
log.Warn("Encode S3 Writer, update not supported")
}

func (s *encodeS3) createObjectTimeoutLoop() {
log.Debugf("entering createObjectTimeoutLoop")
ticker := time.NewTicker(s.s3Params.WriteTimeout.Duration)
Expand Down Expand Up @@ -215,3 +219,7 @@ func (e *encodeS3Writer) putObject(bucket string, objectName string, object map[
log.Debugf("uploadInfo = %v", uploadInfo)
return err
}

func (e *encodeS3Writer) Update(_ config.StageParam) {
log.Warn("Encode S3 Writer, update not supported")
}
Loading
Loading