diff --git a/cmd/flowlogs-pipeline/main.go b/cmd/flowlogs-pipeline/main.go index 62fae729f..eb618bb63 100644 --- a/cmd/flowlogs-pipeline/main.go +++ b/cmd/flowlogs-pipeline/main.go @@ -202,7 +202,7 @@ func run() { }, } tlsConfig := cfg.MetricsSettings.TLS - go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic) + go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic, prometheus.DefaultGatherer.(*prometheus.Registry)) // Create new flows pipeline mainPipeline, err = pipeline.NewPipeline(&cfg) diff --git a/pkg/api/encode_prom.go b/pkg/api/encode_prom.go index c557ec031..530aed26f 100644 --- a/pkg/api/encode_prom.go +++ b/pkg/api/encode_prom.go @@ -23,6 +23,7 @@ type PromTLSConf struct { } type PromEncode struct { + *PromConnectionInfo Metrics PromMetricsItems `yaml:"metrics,omitempty" json:"metrics,omitempty" doc:"list of prometheus metric definitions, each includes:"` Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix added to each metric name"` ExpiryTime Duration `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"time duration of no-flow to wait before deleting prometheus data item"` @@ -40,6 +41,12 @@ func PromEncodeOperationName(operation string) string { return GetEnumName(PromEncodeOperationEnum{}, operation) } +type PromConnectionInfo struct { + Address string `yaml:"address,omitempty" json:"address,omitempty" doc:"endpoint address to expose"` + Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"endpoint port number to expose"` + TLS *PromTLSConf `yaml:"tls,omitempty" json:"tls,omitempty" doc:"TLS configuration for the endpoint"` +} + type PromMetricsItem struct { Name string `yaml:"name" json:"name" doc:"the metric name"` Type string `yaml:"type" json:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"` diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index 3f0d43a7f..7b49c9a61 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -68,8 +68,8 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct { Pipeline: pipeline.GetStages(), Parameters: pipeline.GetStageParams(), MetricsSettings: config.MetricsSettings{ - Port: 9102, - Prefix: "flp_op_", + PromConnectionInfo: &api.PromConnectionInfo{Port: 9102}, + Prefix: "flp_op_", }, } } diff --git a/pkg/config/config.go b/pkg/config/config.go index d68139a5e..c9e5efcf2 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -55,12 +55,10 @@ type Profile struct { // Also, currently FLP doesn't support defining more than one PromEncode stage. If this feature is added later, these global settings // will help configuring common setting for all PromEncode stages - PromEncode settings would then act as overrides. type MetricsSettings struct { - Address string `yaml:"address,omitempty" json:"address,omitempty" doc:"address to expose \"/metrics\" endpoint"` - Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"port number to expose \"/metrics\" endpoint"` - TLS *api.PromTLSConf `yaml:"tls,omitempty" json:"tls,omitempty" doc:"TLS configuration for the prometheus endpoint"` - Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"` - NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"` - SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"` + *api.PromConnectionInfo + Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"` + NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"` + SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"` } // PerfSettings allows setting some internal configuration parameters diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index f13cf093c..f4812140b 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -18,7 +18,9 @@ package encode import ( + "crypto/tls" "fmt" + "net/http" "strings" "time" @@ -50,6 +52,8 @@ type histoInfo struct { } type EncodeProm struct { + cfg *api.PromEncode + registerer prometheus.Registerer gauges []gaugeInfo counters []counterInfo histos []histoInfo @@ -270,6 +274,26 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En } log.Debugf("expiryTime = %v", expiryTime) + var registerer prometheus.Registerer + + if cfg.PromConnectionInfo != nil { + registry := prometheus.NewRegistry() + registerer = registry + addr := fmt.Sprintf("%s:%v", cfg.PromConnectionInfo.Address, cfg.PromConnectionInfo.Port) + log.Infof("startServer: addr = %s", addr) + promServer := &http.Server{ + Addr: addr, + // TLS clients must use TLS 1.2 or higher + TLSConfig: &tls.Config{ + MinVersion: tls.VersionTLS12, + }, + } + tlsConfig := cfg.PromConnectionInfo.TLS + go putils.StartPromServer(tlsConfig, promServer, true, registry) + } else { + registerer = prometheus.DefaultRegisterer + } + counters := []counterInfo{} gauges := []gaugeInfo{} histos := []histoInfo{} @@ -284,7 +308,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En switch mCfg.Type { case api.PromEncodeOperationName("Counter"): counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, labels) - err := prometheus.Register(counter) + err := registerer.Register(counter) if err != nil { log.Errorf("error during prometheus.Register: %v", err) return nil, err @@ -295,7 +319,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En }) case api.PromEncodeOperationName("Gauge"): gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels) - err := prometheus.Register(gauge) + err := registerer.Register(gauge) if err != nil { log.Errorf("error during prometheus.Register: %v", err) return nil, err @@ -307,7 +331,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En case api.PromEncodeOperationName("Histogram"): log.Debugf("buckets = %v", mCfg.Buckets) hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) - err := prometheus.Register(hist) + err := registerer.Register(hist) if err != nil { log.Errorf("error during prometheus.Register: %v", err) return nil, err @@ -319,7 +343,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En case api.PromEncodeOperationName("AggHistogram"): log.Debugf("buckets = %v", mCfg.Buckets) hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) - err := prometheus.Register(hist) + err := registerer.Register(hist) if err != nil { log.Errorf("error during prometheus.Register: %v", err) return nil, err @@ -342,6 +366,8 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, params.Name) w := &EncodeProm{ + cfg: params.Encode.Prom, + registerer: registerer, counters: counters, gauges: gauges, histos: histos, diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index b3332212c..ed4b1f6a5 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -578,3 +578,75 @@ func BenchmarkPromEncode(b *testing.B) { } } } + +const testConfigMultiple = `--- +log-level: debug +pipeline: + - name: encode1 + - name: encode2 + - name: encode3 +parameters: + - name: encode1 + encode: + type: prom + prom: + prefix: test1_ + metrics: + - name: Bytes + type: gauge + valueKey: bytes + labels: + - srcAddr + - name: encode2 + encode: + type: prom + prom: + prefix: test2_ + metrics: + - name: Packets + type: counter + valueKey: packets + labels: + - dstAddr + - dstPort + - name: encode3 + encode: + type: prom + prom: + prefix: test3_ + metrics: + - name: subnetHistogram + type: histogram + valueKey: aggregate + labels: +` + +func Test_MultipleProm(t *testing.T) { + v, cfg := test.InitConfig(t, testConfigMultiple) + require.NotNil(t, v) + encodeProm1, err := initProm(cfg.Parameters[0].Encode.Prom) + require.NoError(t, err) + encodeProm2, err := initProm(cfg.Parameters[1].Encode.Prom) + require.NoError(t, err) + encodeProm3, err := initProm(cfg.Parameters[2].Encode.Prom) + require.NoError(t, err) + + require.Equal(t, 0, len(encodeProm1.counters)) + require.Equal(t, 1, len(encodeProm1.gauges)) + require.Equal(t, 0, len(encodeProm1.histos)) + require.Equal(t, 0, len(encodeProm1.aggHistos)) + require.Equal(t, 1, len(encodeProm2.counters)) + require.Equal(t, 0, len(encodeProm2.gauges)) + require.Equal(t, 0, len(encodeProm2.histos)) + require.Equal(t, 0, len(encodeProm2.aggHistos)) + require.Equal(t, 0, len(encodeProm3.counters)) + require.Equal(t, 0, len(encodeProm3.gauges)) + require.Equal(t, 1, len(encodeProm3.histos)) + require.Equal(t, 0, len(encodeProm3.aggHistos)) + + require.Equal(t, "test1_", encodeProm1.cfg.Prefix) + require.Equal(t, "test2_", encodeProm2.cfg.Prefix) + require.Equal(t, "test3_", encodeProm3.cfg.Prefix) + + // TODO: Add test for different addresses, but need to deal with StartPromServer (ListenAndServe) +} diff --git a/pkg/pipeline/utils/prom_server.go b/pkg/pipeline/utils/prom_server.go index 51cd8eb59..17d0e8c7d 100644 --- a/pkg/pipeline/utils/prom_server.go +++ b/pkg/pipeline/utils/prom_server.go @@ -22,17 +22,20 @@ import ( "os" "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" ) // StartPromServer listens for prometheus resource usage requests -func StartPromServer(tlsConfig *api.PromTLSConf, server *http.Server, panicOnError bool) { +func StartPromServer(tlsConfig *api.PromTLSConf, server *http.Server, panicOnError bool, reg *prometheus.Registry) { logrus.Debugf("entering StartPromServer") // The Handler function provides a default handler to expose metrics // via an HTTP server. "/metrics" is the usual endpoint for that. - http.Handle("/metrics", promhttp.Handler()) + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + server.Handler = mux var err error if tlsConfig != nil {