Skip to content

Commit

Permalink
allow multiple prometheus endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
KalmanMeth committed Nov 12, 2023
1 parent 06ac688 commit 28a06bc
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func run() {
},
}
tlsConfig := cfg.MetricsSettings.TLS
go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic)
go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic, prometheus.DefaultGatherer.(*prometheus.Registry))

// Create new flows pipeline
mainPipeline, err = pipeline.NewPipeline(&cfg)
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type PromTLSConf struct {
}

type PromEncode struct {
*PromConnectionInfo
Metrics PromMetricsItems `yaml:"metrics,omitempty" json:"metrics,omitempty" doc:"list of prometheus metric definitions, each includes:"`
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix added to each metric name"`
ExpiryTime Duration `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"time duration of no-flow to wait before deleting prometheus data item"`
Expand All @@ -40,6 +41,12 @@ func PromEncodeOperationName(operation string) string {
return GetEnumName(PromEncodeOperationEnum{}, operation)
}

type PromConnectionInfo struct {
Address string `yaml:"address,omitempty" json:"address,omitempty" doc:"endpoint address to expose"`
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"endpoint port number to expose"`
TLS *PromTLSConf `yaml:"tls,omitempty" json:"tls,omitempty" doc:"TLS configuration for the endpoint"`
}

type PromMetricsItem struct {
Name string `yaml:"name" json:"name" doc:"the metric name"`
Type string `yaml:"type" json:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"`
Expand Down
4 changes: 2 additions & 2 deletions pkg/confgen/flowlogs2metrics_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct {
Pipeline: pipeline.GetStages(),
Parameters: pipeline.GetStageParams(),
MetricsSettings: config.MetricsSettings{
Port: 9102,
Prefix: "flp_op_",
PromConnectionInfo: &api.PromConnectionInfo{Port: 9102},
Prefix: "flp_op_",
},
}
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ type Profile struct {
// Also, currently FLP doesn't support defining more than one PromEncode stage. If this feature is added later, these global settings
// will help configuring common setting for all PromEncode stages - PromEncode settings would then act as overrides.
type MetricsSettings struct {
Address string `yaml:"address,omitempty" json:"address,omitempty" doc:"address to expose \"/metrics\" endpoint"`
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"port number to expose \"/metrics\" endpoint"`
TLS *api.PromTLSConf `yaml:"tls,omitempty" json:"tls,omitempty" doc:"TLS configuration for the prometheus endpoint"`
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"`
NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"`
SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"`
*api.PromConnectionInfo
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"`
NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"`
SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"`
}

// PerfSettings allows setting some internal configuration parameters
Expand Down
34 changes: 30 additions & 4 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package encode

import (
"crypto/tls"
"fmt"
"net/http"
"strings"
"time"

Expand Down Expand Up @@ -50,6 +52,8 @@ type histoInfo struct {
}

type EncodeProm struct {
cfg *api.PromEncode
registerer prometheus.Registerer
gauges []gaugeInfo
counters []counterInfo
histos []histoInfo
Expand Down Expand Up @@ -270,6 +274,26 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
}
log.Debugf("expiryTime = %v", expiryTime)

var registerer prometheus.Registerer

if cfg.PromConnectionInfo != nil {
registry := prometheus.NewRegistry()
registerer = registry
addr := fmt.Sprintf("%s:%v", cfg.PromConnectionInfo.Address, cfg.PromConnectionInfo.Port)
log.Infof("startServer: addr = %s", addr)
promServer := &http.Server{
Addr: addr,
// TLS clients must use TLS 1.2 or higher
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
tlsConfig := cfg.PromConnectionInfo.TLS
go putils.StartPromServer(tlsConfig, promServer, true, registry)
} else {
registerer = prometheus.DefaultRegisterer
}

counters := []counterInfo{}
gauges := []gaugeInfo{}
histos := []histoInfo{}
Expand All @@ -284,7 +308,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
switch mCfg.Type {
case api.PromEncodeOperationName("Counter"):
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, labels)
err := prometheus.Register(counter)
err := registerer.Register(counter)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
Expand All @@ -295,7 +319,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
})
case api.PromEncodeOperationName("Gauge"):
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels)
err := prometheus.Register(gauge)
err := registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
Expand All @@ -307,7 +331,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
case api.PromEncodeOperationName("Histogram"):
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
err := prometheus.Register(hist)
err := registerer.Register(hist)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
Expand All @@ -319,7 +343,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
case api.PromEncodeOperationName("AggHistogram"):
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
err := prometheus.Register(hist)
err := registerer.Register(hist)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
Expand All @@ -342,6 +366,8 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, params.Name)

w := &EncodeProm{
cfg: params.Encode.Prom,
registerer: registerer,
counters: counters,
gauges: gauges,
histos: histos,
Expand Down
72 changes: 72 additions & 0 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,75 @@ func BenchmarkPromEncode(b *testing.B) {
}
}
}

const testConfigMultiple = `---
log-level: debug
pipeline:
- name: encode1
- name: encode2
- name: encode3
parameters:
- name: encode1
encode:
type: prom
prom:
prefix: test1_
metrics:
- name: Bytes
type: gauge
valueKey: bytes
labels:
- srcAddr
- name: encode2
encode:
type: prom
prom:
prefix: test2_
metrics:
- name: Packets
type: counter
valueKey: packets
labels:
- dstAddr
- dstPort
- name: encode3
encode:
type: prom
prom:
prefix: test3_
metrics:
- name: subnetHistogram
type: histogram
valueKey: aggregate
labels:
`

func Test_MultipleProm(t *testing.T) {
v, cfg := test.InitConfig(t, testConfigMultiple)
require.NotNil(t, v)
encodeProm1, err := initProm(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)
encodeProm2, err := initProm(cfg.Parameters[1].Encode.Prom)
require.NoError(t, err)
encodeProm3, err := initProm(cfg.Parameters[2].Encode.Prom)
require.NoError(t, err)

require.Equal(t, 0, len(encodeProm1.counters))
require.Equal(t, 1, len(encodeProm1.gauges))
require.Equal(t, 0, len(encodeProm1.histos))
require.Equal(t, 0, len(encodeProm1.aggHistos))
require.Equal(t, 1, len(encodeProm2.counters))
require.Equal(t, 0, len(encodeProm2.gauges))
require.Equal(t, 0, len(encodeProm2.histos))
require.Equal(t, 0, len(encodeProm2.aggHistos))
require.Equal(t, 0, len(encodeProm3.counters))
require.Equal(t, 0, len(encodeProm3.gauges))
require.Equal(t, 1, len(encodeProm3.histos))
require.Equal(t, 0, len(encodeProm3.aggHistos))

require.Equal(t, "test1_", encodeProm1.cfg.Prefix)
require.Equal(t, "test2_", encodeProm2.cfg.Prefix)
require.Equal(t, "test3_", encodeProm3.cfg.Prefix)

// TODO: Add test for different addresses, but need to deal with StartPromServer (ListenAndServe)
}
7 changes: 5 additions & 2 deletions pkg/pipeline/utils/prom_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@ import (
"os"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)

// StartPromServer listens for prometheus resource usage requests
func StartPromServer(tlsConfig *api.PromTLSConf, server *http.Server, panicOnError bool) {
func StartPromServer(tlsConfig *api.PromTLSConf, server *http.Server, panicOnError bool, reg *prometheus.Registry) {
logrus.Debugf("entering StartPromServer")

// The Handler function provides a default handler to expose metrics
// via an HTTP server. "/metrics" is the usual endpoint for that.
http.Handle("/metrics", promhttp.Handler())
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
server.Handler = mux

var err error
if tlsConfig != nil {
Expand Down

0 comments on commit 28a06bc

Please sign in to comment.