From 4750c83a755b528a1cfbddcb7603d07d33c16c76 Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Thu, 22 Feb 2024 15:10:44 +0000 Subject: [PATCH] Added config watcher and supporting metrics reloading --- pkg/config/config.go | 22 +- pkg/pipeline/encode/encode.go | 5 + pkg/pipeline/encode/encode_kafka.go | 4 + pkg/pipeline/encode/encode_prom.go | 226 +++++++++++++++++- pkg/pipeline/encode/encode_prom_test.go | 8 +- pkg/pipeline/encode/encode_s3.go | 8 + pkg/pipeline/encode/metrics_common.go | 28 ++- .../encode/opentelemetry/encode_otlplogs.go | 4 + .../opentelemetry/encode_otlpmetrics.go | 10 +- .../encode/opentelemetry/encode_otlptrace.go | 4 + pkg/pipeline/pipeline.go | 19 +- pkg/pipeline/pipeline_builder.go | 11 +- pkg/pipeline/pipeline_watcher.go | 110 +++++++++ .../kubernetes/informers/informers.go | 34 +-- pkg/utils/kubernetes.go | 41 ++++ 15 files changed, 466 insertions(+), 68 deletions(-) create mode 100644 pkg/pipeline/pipeline_watcher.go create mode 100644 pkg/utils/kubernetes.go diff --git a/pkg/config/config.go b/pkg/config/config.go index 1cf8ca896..02bcdf697 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -38,11 +38,23 @@ type Options struct { // //nolint:revive type ConfigFileStruct struct { - LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"` - MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"` - Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"` - Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"` - PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"` + LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"` + MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"` + Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"` + Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"` + PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"` + DynamicParameters DynamicParameters `yaml:"dynamicParameters,omitempty" json:"dynamicParameters,omitempty"` +} + +type DynamicParameters struct { + Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty"` + Name string `yaml:"name,omitempty" json:"name,omitempty"` + FileName string `yaml:"fileName,omitempty" json:"fileName,omitempty"` + KubeConfigPath string `yaml:"kubeConfigPath,omitempty" json:"kubeConfigPath,omitempty" doc:"path to kubeconfig file (optional)"` +} + +type ConfigHotReloadStruct struct { + Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"` } type Health struct { diff --git a/pkg/pipeline/encode/encode.go b/pkg/pipeline/encode/encode.go index a7c6eefa8..e754ed1d4 100644 --- a/pkg/pipeline/encode/encode.go +++ b/pkg/pipeline/encode/encode.go @@ -28,6 +28,7 @@ type encodeNone struct { type Encoder interface { Encode(in config.GenericMap) + Update(config.StageParam) } // Encode encodes a flow before being stored @@ -35,6 +36,10 @@ func (t *encodeNone) Encode(in config.GenericMap) { t.prevRecord = in } +func (t *encodeNone) Update(_ config.StageParam) { + log.Warn("Encode None, update not supported") +} + // NewEncodeNone create a new encode func NewEncodeNone() (Encoder, error) { log.Debugf("entering NewEncodeNone") diff --git a/pkg/pipeline/encode/encode_kafka.go b/pkg/pipeline/encode/encode_kafka.go index 7dcf0f24f..33d3f5262 100644 --- a/pkg/pipeline/encode/encode_kafka.go +++ b/pkg/pipeline/encode/encode_kafka.go @@ -66,6 +66,10 @@ func (r *encodeKafka) Encode(entry config.GenericMap) { } } +func (t *encodeKafka) Update(_ config.StageParam) { + log.Warn("Encode Kafka, update not supported") +} + // NewEncodeKafka create a new writer to kafka func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { log.Debugf("entering NewEncodeKafka") diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index d9f1de669..ed0c1041c 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -18,6 +18,8 @@ package encode import ( + "reflect" + "strings" "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" @@ -35,12 +37,14 @@ type EncodeProm struct { cfg *api.PromEncode registerer prometheus.Registerer metricCommon *MetricsCommonStruct + updateChan chan config.StageParam } // Encode encodes a metric before being stored; the heavy work is done by the MetricCommonEncode func (e *EncodeProm) Encode(metricRecord config.GenericMap) { log.Tracef("entering EncodeMetric. metricRecord = %v", metricRecord) e.metricCommon.MetricCommonEncode(e, metricRecord) + e.checkConfUpdate() } func (e *EncodeProm) ProcessCounter(m interface{}, labels map[string]string, value float64) error { @@ -102,6 +106,215 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) { cleanupFunc.(func())() } +func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) { + counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.Labels) + err := e.registerer.Register(counter) + if err != nil { + log.Errorf("error during prometheus.Register: %v", err) + } + e.metricCommon.AddCounter(fullMetricName, counter, mInfo) +} + +func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) { + gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.Labels) + err := e.registerer.Register(gauge) + if err != nil { + log.Errorf("error during prometheus.Register: %v", err) + } + e.metricCommon.AddGauge(fullMetricName, gauge, mInfo) +} +func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) { + histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels) + err := e.registerer.Register(histogram) + if err != nil { + log.Errorf("error during prometheus.Register: %v", err) + } + e.metricCommon.AddHist(fullMetricName, histogram, mInfo) +} +func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) { + agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels) + err := e.registerer.Register(agghistogram) + if err != nil { + log.Errorf("error during prometheus.Register: %v", err) + } + e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo) +} + +func (e *EncodeProm) unregisterMetric(c interface{}) { + if c, ok := c.(prometheus.Collector); ok { + e.registerer.Unregister(c) + } + +} + +func (e *EncodeProm) cleanDeletedCounters(newCfg api.PromEncode) { + for fullName, counter := range e.metricCommon.counters { + if !strings.HasPrefix(fullName, newCfg.Prefix) { + e.unregisterMetric(counter.genericMetric) + delete(e.metricCommon.counters, fullName) + } + metricName := strings.TrimPrefix(fullName, newCfg.Prefix) + found := false + for _, mCfg := range newCfg.Metrics { + if metricName == mCfg.Name { + found = true + break + } + } + if !found { + e.unregisterMetric(counter) + delete(e.metricCommon.counters, fullName) + + } + } + +} + +func (e *EncodeProm) cleanDeletedGauges(newCfg api.PromEncode) { + for fullName, gauge := range e.metricCommon.gauges { + if !strings.HasPrefix(fullName, newCfg.Prefix) { + if c, ok := gauge.genericMetric.(prometheus.Collector); ok { + e.registerer.Unregister(c) + } + e.unregisterMetric(gauge.genericMetric) + delete(e.metricCommon.gauges, fullName) + } + metricName := strings.TrimPrefix(fullName, newCfg.Prefix) + found := false + for _, mCfg := range newCfg.Metrics { + if metricName == mCfg.Name { + found = true + break + } + } + if !found { + e.unregisterMetric(gauge.genericMetric) + delete(e.metricCommon.gauges, fullName) + + } + } + +} + +func (e *EncodeProm) cleanDeletedHistograms(newCfg api.PromEncode) { + for fullName, histogram := range e.metricCommon.histos { + if !strings.HasPrefix(fullName, newCfg.Prefix) { + e.unregisterMetric(histogram.genericMetric) + delete(e.metricCommon.histos, fullName) + } + metricName := strings.TrimPrefix(fullName, newCfg.Prefix) + found := false + for _, mCfg := range newCfg.Metrics { + if metricName == mCfg.Name { + found = true + break + } + } + if !found { + e.unregisterMetric(histogram.genericMetric) + delete(e.metricCommon.histos, fullName) + + } + } + +} + +func (e *EncodeProm) cleanDeletedAggHistograms(newCfg api.PromEncode) { + for fullName, aggHisto := range e.metricCommon.aggHistos { + if !strings.HasPrefix(fullName, newCfg.Prefix) { + e.unregisterMetric(aggHisto.genericMetric) + delete(e.metricCommon.aggHistos, fullName) + } + metricName := strings.TrimPrefix(fullName, newCfg.Prefix) + found := false + for _, mCfg := range newCfg.Metrics { + if metricName == mCfg.Name { + found = true + break + } + } + if !found { + e.unregisterMetric(aggHisto.genericMetric) + delete(e.metricCommon.aggHistos, fullName) + + } + } + +} + +func (e *EncodeProm) cleanDeletedMetrics(newCfg api.PromEncode) { + e.cleanDeletedCounters(newCfg) + e.cleanDeletedGauges(newCfg) + e.cleanDeletedHistograms(newCfg) + e.cleanDeletedAggHistograms(newCfg) +} + +func (e *EncodeProm) checkConfUpdate() { + select { + case stage := <-e.updateChan: + cfg := api.PromEncode{} + if stage.Encode != nil && stage.Encode.Prom != nil { + cfg = *stage.Encode.Prom + } + + e.cleanDeletedMetrics(cfg) + + for _, mCfg := range cfg.Metrics { + fullMetricName := cfg.Prefix + mCfg.Name + mInfo := CreateMetricInfo(&mCfg) + switch mCfg.Type { + case api.MetricEncodeOperationName("Counter"): + if oldMetric, ok := e.metricCommon.counters[fullMetricName]; ok { + if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) { + e.unregisterMetric(oldMetric.genericMetric) + e.addCounter(fullMetricName, mInfo) + } + } else { + // New metric + e.addCounter(fullMetricName, mInfo) + } + case api.MetricEncodeOperationName("Gauge"): + if oldMetric, ok := e.metricCommon.gauges[fullMetricName]; ok { + if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) { + e.unregisterMetric(oldMetric.genericMetric) + e.addGauge(fullMetricName, mInfo) + } + } else { + // New metric + e.addGauge(fullMetricName, mInfo) + } + case api.MetricEncodeOperationName("Histogram"): + if oldMetric, ok := e.metricCommon.histos[fullMetricName]; ok { + if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) { + e.unregisterMetric(oldMetric.genericMetric) + e.addHistogram(fullMetricName, mInfo) + } + } else { + // New metric + e.addHistogram(fullMetricName, mInfo) + } + case api.MetricEncodeOperationName("AggHistogram"): + if oldMetric, ok := e.metricCommon.aggHistos[fullMetricName]; ok { + if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) { + e.unregisterMetric(oldMetric.genericMetric) + e.addAgghistogram(fullMetricName, mInfo) + } + } else { + // New metric + e.addAgghistogram(fullMetricName, mInfo) + } + case "default": + log.Errorf("invalid metric type = %v, skipping", mCfg.Type) + continue + } + + } + default: + //Nothing to do + return + } +} + func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { cfg := api.PromEncode{} if params.Encode != nil && params.Encode.Prom != nil { @@ -126,6 +339,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En w := &EncodeProm{ cfg: params.Encode.Prom, registerer: registerer, + updateChan: make(chan config.StageParam), } metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup) @@ -146,7 +360,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Errorf("error during prometheus.Register: %v", err) return nil, err } - metricCommon.AddCounter(counter, mInfo) + metricCommon.AddCounter(fullMetricName, counter, mInfo) case api.MetricEncodeOperationName("Gauge"): gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels) err := registerer.Register(gauge) @@ -154,7 +368,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Errorf("error during prometheus.Register: %v", err) return nil, err } - metricCommon.AddGauge(gauge, mInfo) + metricCommon.AddGauge(fullMetricName, gauge, mInfo) case api.MetricEncodeOperationName("Histogram"): log.Debugf("buckets = %v", mCfg.Buckets) hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) @@ -163,7 +377,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Errorf("error during prometheus.Register: %v", err) return nil, err } - metricCommon.AddHist(hist, mInfo) + metricCommon.AddHist(fullMetricName, hist, mInfo) case api.MetricEncodeOperationName("AggHistogram"): log.Debugf("buckets = %v", mCfg.Buckets) hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) @@ -172,7 +386,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Errorf("error during prometheus.Register: %v", err) return nil, err } - metricCommon.AddAggHist(hist, mInfo) + metricCommon.AddAggHist(fullMetricName, hist, mInfo) case "default": log.Errorf("invalid metric type = %v, skipping", mCfg.Type) continue @@ -180,3 +394,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En } return w, nil } + +func (t *EncodeProm) Update(config config.StageParam) { + t.updateChan <- config +} diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 0820a81b7..ea442bf7c 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -99,13 +99,13 @@ func Test_NewEncodeProm(t *testing.T) { require.Equal(t, 1, len(encodeProm.metricCommon.aggHistos)) require.Equal(t, time.Second, encodeProm.metricCommon.expiryTime) - require.Equal(t, encodeProm.metricCommon.gauges[0].info.Name, "Bytes") + require.Equal(t, encodeProm.metricCommon.gauges["test_Bytes"].info.Name, "Bytes") expectedList := []string{"srcAddr", "dstAddr", "srcPort"} - require.Equal(t, encodeProm.metricCommon.gauges[0].info.Labels, expectedList) + require.Equal(t, encodeProm.metricCommon.gauges["test_Bytes"].info.Labels, expectedList) - require.Equal(t, encodeProm.metricCommon.counters[0].info.Name, "Packets") + require.Equal(t, encodeProm.metricCommon.counters["test_Packets"].info.Name, "Packets") expectedList = []string{"srcAddr", "dstAddr", "dstPort"} - require.Equal(t, encodeProm.metricCommon.counters[0].info.Labels, expectedList) + require.Equal(t, encodeProm.metricCommon.counters["test_Packets"].info.Labels, expectedList) entry := test.GetExtractMockEntry() encodeProm.Encode(entry) diff --git a/pkg/pipeline/encode/encode_s3.go b/pkg/pipeline/encode/encode_s3.go index 9b7bb1267..a9fb62992 100644 --- a/pkg/pipeline/encode/encode_s3.go +++ b/pkg/pipeline/encode/encode_s3.go @@ -109,6 +109,10 @@ func (s *encodeS3) GenerateStoreHeader(flows []config.GenericMap, startTime time return object } +func (t *encodeS3) Update(_ config.StageParam) { + log.Warn("Encode S3 Writer, update not supported") +} + func (s *encodeS3) createObjectTimeoutLoop() { log.Debugf("entering createObjectTimeoutLoop") ticker := time.NewTicker(s.s3Params.WriteTimeout.Duration) @@ -215,3 +219,7 @@ func (e *encodeS3Writer) putObject(bucket string, objectName string, object map[ log.Debugf("uploadInfo = %v", uploadInfo) return err } + +func (t *encodeS3Writer) Update(_ config.StageParam) { + log.Warn("Encode S3 Writer, update not supported") +} diff --git a/pkg/pipeline/encode/metrics_common.go b/pkg/pipeline/encode/metrics_common.go index 28655d959..afa384ad8 100644 --- a/pkg/pipeline/encode/metrics_common.go +++ b/pkg/pipeline/encode/metrics_common.go @@ -37,10 +37,10 @@ type mInfoStruct struct { } type MetricsCommonStruct struct { - gauges []mInfoStruct - counters []mInfoStruct - histos []mInfoStruct - aggHistos []mInfoStruct + gauges map[string]mInfoStruct + counters map[string]mInfoStruct + histos map[string]mInfoStruct + aggHistos map[string]mInfoStruct mCache *putils.TimedCache mChacheLenMetric prometheus.Gauge metricsProcessed prometheus.Counter @@ -85,24 +85,24 @@ var ( ) ) -func (m *MetricsCommonStruct) AddCounter(g interface{}, info *MetricInfo) { +func (m *MetricsCommonStruct) AddCounter(name string, g interface{}, info *MetricInfo) { mStruct := mInfoStruct{genericMetric: g, info: info} - m.counters = append(m.counters, mStruct) + m.counters[name] = mStruct } -func (m *MetricsCommonStruct) AddGauge(g interface{}, info *MetricInfo) { +func (m *MetricsCommonStruct) AddGauge(name string, g interface{}, info *MetricInfo) { mStruct := mInfoStruct{genericMetric: g, info: info} - m.gauges = append(m.gauges, mStruct) + m.gauges[name] = mStruct } -func (m *MetricsCommonStruct) AddHist(g interface{}, info *MetricInfo) { +func (m *MetricsCommonStruct) AddHist(name string, g interface{}, info *MetricInfo) { mStruct := mInfoStruct{genericMetric: g, info: info} - m.histos = append(m.histos, mStruct) + m.histos[name] = mStruct } -func (m *MetricsCommonStruct) AddAggHist(g interface{}, info *MetricInfo) { +func (m *MetricsCommonStruct) AddAggHist(name string, g interface{}, info *MetricInfo) { mStruct := mInfoStruct{genericMetric: g, info: info} - m.aggHistos = append(m.aggHistos, mStruct) + m.aggHistos[name] = mStruct } func (m *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, metricRecord config.GenericMap) { @@ -273,6 +273,10 @@ func NewMetricsCommonStruct(opMetrics *operational.Metrics, maxCacheEntries int, errorsCounter: opMetrics.NewCounterVec(&encodePromErrors), expiryTime: expiryTime.Duration, exitChan: putils.ExitChannel(), + gauges: map[string]mInfoStruct{}, + counters: map[string]mInfoStruct{}, + histos: map[string]mInfoStruct{}, + aggHistos: map[string]mInfoStruct{}, } go m.cleanupExpiredEntriesLoop(callback) return m diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go b/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go index 00ee43370..7af924528 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go @@ -42,6 +42,10 @@ func (e *EncodeOtlpLogs) Encode(entry config.GenericMap) { e.LogWrite(entry) } +func (t *EncodeOtlpLogs) Update(_ config.StageParam) { + log.Warn("EncodeOtlpLogs, update not supported") +} + func NewEncodeOtlpLogs(_ *operational.Metrics, params config.StageParam) (encode.Encoder, error) { log.Tracef("entering NewEncodeOtlpLogs \n") cfg := api.EncodeOtlpLogs{} diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go index 3ebd84416..a622a4af7 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go @@ -45,6 +45,10 @@ type EncodeOtlpMetrics struct { metricCommon *encode.MetricsCommonStruct } +func (t *EncodeOtlpMetrics) Update(_ config.StageParam) { + log.Warn("EncodeOtlpMetrics, update not supported") +} + // Encode encodes a metric to be exported func (e *EncodeOtlpMetrics) Encode(metricRecord config.GenericMap) { log.Tracef("entering EncodeOtlpMetrics. entry = %v", metricRecord) @@ -140,7 +144,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar log.Errorf("error during counter creation: %v", err) return nil, err } - metricCommon.AddCounter(counter, mInfo) + metricCommon.AddCounter(fullMetricName, counter, mInfo) case api.MetricEncodeOperationName("Gauge"): // at implementation time, only asynchronous gauges are supported by otel in golang obs := Float64Gauge{observations: make(map[string]Float64GaugeEntry)} @@ -152,7 +156,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar log.Errorf("error during gauge creation: %v", err) return nil, err } - metricCommon.AddGauge(gauge, mInfo) + metricCommon.AddGauge(fullMetricName, gauge, mInfo) case api.MetricEncodeOperationName("Histogram"): var histo metric.Float64Histogram if len(mCfg.Buckets) == 0 { @@ -167,7 +171,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar log.Errorf("error during histogram creation: %v", err) return nil, err } - metricCommon.AddHist(histo, mInfo) + metricCommon.AddHist(fullMetricName, histo, mInfo) case "default": log.Errorf("invalid metric type = %v, skipping", mCfg.Type) continue diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go b/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go index 4930c4e51..d04339d7d 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlptrace.go @@ -91,6 +91,10 @@ OUTER: } } +func (t *EncodeOtlpTrace) Update(_ config.StageParam) { + log.Warn("EncodeOtlpTrace, update not supported") +} + func NewEncodeOtlpTraces(_ *operational.Metrics, params config.StageParam) (encode.Encoder, error) { log.Tracef("entering NewEncodeOtlpTraces \n") cfg := api.EncodeOtlpTraces{} diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 12cadfce5..35cde905a 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -38,13 +38,15 @@ const ( // Pipeline manager type Pipeline struct { - startNodes []*node.Start[config.GenericMap] - terminalNodes []*node.Terminal[config.GenericMap] - IsRunning bool + startNodes []*node.Start[config.GenericMap] + terminalNodes []*node.Terminal[config.GenericMap] + pipelineEntryMap map[string]*pipelineEntry + IsRunning bool // TODO: this field is only used for test verification. We should rewrite the build process // to be able to remove it from here pipelineStages []*pipelineEntry Metrics *operational.Metrics + configWatcher *pipelineConfigWatcher } // NewPipeline defines the pipeline elements @@ -66,7 +68,12 @@ func newPipelineFromIngester(cfg *config.ConfigFileStruct, ing ingest.Ingester) if err := builder.readStages(); err != nil { return nil, err } - return builder.build() + pipeline, err := builder.build() + if err != nil { + return nil, err + } + pipeline.configWatcher, err = newPipelineConfigWatcher(cfg, pipeline.pipelineEntryMap) + return pipeline, err } func (p *Pipeline) Run() { @@ -76,6 +83,10 @@ func (p *Pipeline) Run() { } p.IsRunning = true + if p.configWatcher != nil { + go p.configWatcher.Run() + } + // blocking the execution until the graph terminal stages end for _, t := range p.terminalNodes { <-t.Done() diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index 5eddfce41..487bee286 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -56,6 +56,7 @@ type builder struct { batchMaxLen int batchTimeout time.Duration nodeBufferLen int + updtChans map[string]chan config.StageParam } type pipelineEntry struct { @@ -96,6 +97,7 @@ func newBuilder(cfg *config.ConfigFileStruct) *builder { batchMaxLen: bl, batchTimeout: bt, nodeBufferLen: nb, + updtChans: map[string]chan config.StageParam{}, } } @@ -221,10 +223,11 @@ func (b *builder) build() (*Pipeline, error) { return nil, errors.New("no writers have been defined") } return &Pipeline{ - startNodes: b.startNodes, - terminalNodes: b.terminalNodes, - pipelineStages: b.pipelineStages, - Metrics: b.opMetrics, + startNodes: b.startNodes, + terminalNodes: b.terminalNodes, + pipelineStages: b.pipelineStages, + pipelineEntryMap: b.pipelineEntryMap, + Metrics: b.opMetrics, }, nil } diff --git a/pkg/pipeline/pipeline_watcher.go b/pkg/pipeline/pipeline_watcher.go new file mode 100644 index 000000000..bf3e7fc52 --- /dev/null +++ b/pkg/pipeline/pipeline_watcher.go @@ -0,0 +1,110 @@ +package pipeline + +import ( + "context" + "encoding/json" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" + log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" +) + +type pipelineConfigWatcher struct { + clientSet kubernetes.Clientset + cmName string + cmNamespace string + configFile string + pipelineEntryMap map[string]*pipelineEntry +} + +func newPipelineConfigWatcher(cfg *config.ConfigFileStruct, pipelineEntryMap map[string]*pipelineEntry) (*pipelineConfigWatcher, error) { + if cfg.DynamicParameters.Name == "" || + cfg.DynamicParameters.Namespace == "" || + cfg.DynamicParameters.FileName == "" { + return nil, nil + } + + config, err := utils.LoadK8sConfig(cfg.DynamicParameters.KubeConfigPath) + if err != nil { + return nil, err + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + pipelineCW := pipelineConfigWatcher{ + clientSet: *clientset, + pipelineEntryMap: pipelineEntryMap, + cmName: cfg.DynamicParameters.Name, + cmNamespace: cfg.DynamicParameters.Namespace, + configFile: cfg.DynamicParameters.FileName, + } + + return &pipelineCW, nil + +} + +func (pcw *pipelineConfigWatcher) Run() { + for { + watcher, err := pcw.clientSet.CoreV1().ConfigMaps(pcw.cmNamespace).Watch(context.TODO(), + metav1.SingleObject(metav1.ObjectMeta{Name: pcw.cmName, Namespace: pcw.cmNamespace})) + if err != nil { + panic("Unable to create watcher") + } + pcw.handleEvent(watcher.ResultChan()) + } +} + +func (pcw *pipelineConfigWatcher) handleEvent(eventChannel <-chan watch.Event) { + for { + event, open := <-eventChannel + if open { + switch event.Type { + case watch.Added: + fallthrough + case watch.Modified: + // Update our endpoint + if updatedMap, ok := event.Object.(*corev1.ConfigMap); ok { + pcw.updateFromConfigmap(updatedMap) + } + case watch.Deleted: + fallthrough + default: + // Do nothing + } + } else { + // If eventChannel is closed, it means the server has closed the connection + return + } + } +} + +func (pcw *pipelineConfigWatcher) updateFromConfigmap(cm *corev1.ConfigMap) { + if rawConfig, ok := cm.Data[pcw.configFile]; ok { + config := config.ConfigHotReloadStruct{} + err := json.Unmarshal([]byte(rawConfig), &config) + if err != nil { + log.Errorf("Cannot parse config: %v", err) + return + } + for _, param := range config.Parameters { + if pentry, ok := pcw.pipelineEntryMap[param.Name]; ok { + pcw.updateEntry(*pentry, param) + } + } + } +} + +func (pcw *pipelineConfigWatcher) updateEntry(pEntry pipelineEntry, param config.StageParam) { + switch pEntry.stageType { + case StageEncode: + pEntry.Encoder.Update(param) + default: + log.Warningf("Hot reloading not supported for: %s", pEntry.stageType) + } +} diff --git a/pkg/pipeline/transform/kubernetes/informers/informers.go b/pkg/pipeline/transform/kubernetes/informers/informers.go index 66048a454..4d3fd80c9 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers.go @@ -20,11 +20,10 @@ package informers import ( "fmt" "net" - "os" - "path" "time" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" @@ -34,9 +33,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/metadata" "k8s.io/client-go/metadata/metadatainformer" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/clientcmd" ) const ( @@ -339,7 +336,7 @@ func (k *Informers) InitFromConfig(kubeConfigPath string) error { k.stopChan = make(chan struct{}) k.mdStopChan = make(chan struct{}) - config, err := loadConfig(kubeConfigPath) + config, err := utils.LoadK8sConfig(kubeConfigPath) if err != nil { return err } @@ -362,33 +359,6 @@ func (k *Informers) InitFromConfig(kubeConfigPath string) error { return nil } -func loadConfig(kubeConfigPath string) (*rest.Config, error) { - // if no config path is provided, load it from the env variable - if kubeConfigPath == "" { - kubeConfigPath = os.Getenv(kubeConfigEnvVariable) - } - // otherwise, load it from the $HOME/.kube/config file - if kubeConfigPath == "" { - homeDir, err := os.UserHomeDir() - if err != nil { - return nil, fmt.Errorf("can't get user home dir: %w", err) - } - kubeConfigPath = path.Join(homeDir, ".kube", "config") - } - config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) - if err == nil { - return config, nil - } - // fallback: use in-cluster config - config, err = rest.InClusterConfig() - if err != nil { - return nil, fmt.Errorf("can't access kubenetes. Tried using config from: "+ - "config parameter, %s env, homedir and InClusterConfig. Got: %w", - kubeConfigEnvVariable, err) - } - return config, nil -} - func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error { informerFactory := inf.NewSharedInformerFactory(client, syncTime) metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime) diff --git a/pkg/utils/kubernetes.go b/pkg/utils/kubernetes.go new file mode 100644 index 000000000..3503492ed --- /dev/null +++ b/pkg/utils/kubernetes.go @@ -0,0 +1,41 @@ +package utils + +import ( + "fmt" + "os" + "path" + + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + kubeConfigEnvVariable = "KUBECONFIG" +) + +func LoadK8sConfig(kubeConfigPath string) (*rest.Config, error) { + // if no config path is provided, load it from the env variable + if kubeConfigPath == "" { + kubeConfigPath = os.Getenv(kubeConfigEnvVariable) + } + // otherwise, load it from the $HOME/.kube/config file + if kubeConfigPath == "" { + homeDir, err := os.UserHomeDir() + if err != nil { + return nil, fmt.Errorf("can't get user home dir: %w", err) + } + kubeConfigPath = path.Join(homeDir, ".kube", "config") + } + config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) + if err == nil { + return config, nil + } + // fallback: use in-cluster config + config, err = rest.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("can't access kubenetes. Tried using config from: "+ + "config parameter, %s env, homedir and InClusterConfig. Got: %w", + kubeConfigEnvVariable, err) + } + return config, nil +}