Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple prometheus stages with different URLs #532

Merged
merged 1 commit into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func run() {
},
}
tlsConfig := cfg.MetricsSettings.TLS
go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic)
go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic, prometheus.DefaultGatherer.(*prometheus.Registry))

// Create new flows pipeline
mainPipeline, err = pipeline.NewPipeline(&cfg)
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type PromTLSConf struct {
}

type PromEncode struct {
*PromConnectionInfo
Metrics PromMetricsItems `yaml:"metrics,omitempty" json:"metrics,omitempty" doc:"list of prometheus metric definitions, each includes:"`
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix added to each metric name"`
ExpiryTime Duration `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"time duration of no-flow to wait before deleting prometheus data item"`
Expand All @@ -40,6 +41,12 @@ func PromEncodeOperationName(operation string) string {
return GetEnumName(PromEncodeOperationEnum{}, operation)
}

type PromConnectionInfo struct {
Address string `yaml:"address,omitempty" json:"address,omitempty" doc:"endpoint address to expose"`
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"endpoint port number to expose"`
TLS *PromTLSConf `yaml:"tls,omitempty" json:"tls,omitempty" doc:"TLS configuration for the endpoint"`
}

type PromMetricsItem struct {
Name string `yaml:"name" json:"name" doc:"the metric name"`
Type string `yaml:"type" json:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"`
Expand Down
4 changes: 2 additions & 2 deletions pkg/confgen/flowlogs2metrics_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct {
Pipeline: pipeline.GetStages(),
Parameters: pipeline.GetStageParams(),
MetricsSettings: config.MetricsSettings{
Port: 9102,
Prefix: "flp_op_",
PromConnectionInfo: &api.PromConnectionInfo{Port: 9102},
Prefix: "flp_op_",
},
}
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ type Profile struct {
// Also, currently FLP doesn't support defining more than one PromEncode stage. If this feature is added later, these global settings
// will help configuring common setting for all PromEncode stages - PromEncode settings would then act as overrides.
type MetricsSettings struct {
Address string `yaml:"address,omitempty" json:"address,omitempty" doc:"address to expose \"/metrics\" endpoint"`
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"port number to expose \"/metrics\" endpoint"`
TLS *api.PromTLSConf `yaml:"tls,omitempty" json:"tls,omitempty" doc:"TLS configuration for the prometheus endpoint"`
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"`
NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"`
SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"`
*api.PromConnectionInfo
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"`
NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"`
SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"`
}

// PerfSettings allows setting some internal configuration parameters
Expand Down
34 changes: 30 additions & 4 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package encode

import (
"crypto/tls"
"fmt"
"net/http"
"strings"
"time"

Expand Down Expand Up @@ -50,6 +52,8 @@ type histoInfo struct {
}

type EncodeProm struct {
cfg *api.PromEncode
registerer prometheus.Registerer
gauges []gaugeInfo
counters []counterInfo
histos []histoInfo
Expand Down Expand Up @@ -270,6 +274,26 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
}
log.Debugf("expiryTime = %v", expiryTime)

var registerer prometheus.Registerer

if cfg.PromConnectionInfo != nil {
registry := prometheus.NewRegistry()
registerer = registry
addr := fmt.Sprintf("%s:%v", cfg.PromConnectionInfo.Address, cfg.PromConnectionInfo.Port)
log.Infof("startServer: addr = %s", addr)
promServer := &http.Server{
Addr: addr,
// TLS clients must use TLS 1.2 or higher
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
tlsConfig := cfg.PromConnectionInfo.TLS
go putils.StartPromServer(tlsConfig, promServer, true, registry)
} else {
registerer = prometheus.DefaultRegisterer
}

counters := []counterInfo{}
gauges := []gaugeInfo{}
histos := []histoInfo{}
Expand All @@ -284,7 +308,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
switch mCfg.Type {
case api.PromEncodeOperationName("Counter"):
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, labels)
err := prometheus.Register(counter)
err := registerer.Register(counter)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
Expand All @@ -295,7 +319,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
})
case api.PromEncodeOperationName("Gauge"):
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels)
err := prometheus.Register(gauge)
err := registerer.Register(gauge)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
Expand All @@ -307,7 +331,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
case api.PromEncodeOperationName("Histogram"):
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
err := prometheus.Register(hist)
err := registerer.Register(hist)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
Expand All @@ -319,7 +343,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
case api.PromEncodeOperationName("AggHistogram"):
log.Debugf("buckets = %v", mCfg.Buckets)
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels)
err := prometheus.Register(hist)
err := registerer.Register(hist)
if err != nil {
log.Errorf("error during prometheus.Register: %v", err)
return nil, err
Expand All @@ -342,6 +366,8 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, params.Name)

w := &EncodeProm{
cfg: params.Encode.Prom,
registerer: registerer,
counters: counters,
gauges: gauges,
histos: histos,
Expand Down
72 changes: 72 additions & 0 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,75 @@ func BenchmarkPromEncode(b *testing.B) {
}
}
}

const testConfigMultiple = `---
log-level: debug
pipeline:
- name: encode1
- name: encode2
- name: encode3
parameters:
- name: encode1
encode:
type: prom
prom:
prefix: test1_
metrics:
- name: Bytes
type: gauge
valueKey: bytes
labels:
- srcAddr
- name: encode2
encode:
type: prom
prom:
prefix: test2_
metrics:
- name: Packets
type: counter
valueKey: packets
labels:
- dstAddr
- dstPort
- name: encode3
encode:
type: prom
prom:
prefix: test3_
metrics:
- name: subnetHistogram
type: histogram
valueKey: aggregate
labels:
`

func Test_MultipleProm(t *testing.T) {
v, cfg := test.InitConfig(t, testConfigMultiple)
require.NotNil(t, v)
encodeProm1, err := initProm(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)
encodeProm2, err := initProm(cfg.Parameters[1].Encode.Prom)
require.NoError(t, err)
encodeProm3, err := initProm(cfg.Parameters[2].Encode.Prom)
require.NoError(t, err)

require.Equal(t, 0, len(encodeProm1.counters))
require.Equal(t, 1, len(encodeProm1.gauges))
require.Equal(t, 0, len(encodeProm1.histos))
require.Equal(t, 0, len(encodeProm1.aggHistos))
require.Equal(t, 1, len(encodeProm2.counters))
require.Equal(t, 0, len(encodeProm2.gauges))
require.Equal(t, 0, len(encodeProm2.histos))
require.Equal(t, 0, len(encodeProm2.aggHistos))
require.Equal(t, 0, len(encodeProm3.counters))
require.Equal(t, 0, len(encodeProm3.gauges))
require.Equal(t, 1, len(encodeProm3.histos))
require.Equal(t, 0, len(encodeProm3.aggHistos))

require.Equal(t, "test1_", encodeProm1.cfg.Prefix)
require.Equal(t, "test2_", encodeProm2.cfg.Prefix)
require.Equal(t, "test3_", encodeProm3.cfg.Prefix)

// TODO: Add test for different addresses, but need to deal with StartPromServer (ListenAndServe)
}
7 changes: 5 additions & 2 deletions pkg/pipeline/utils/prom_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@ import (
"os"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)

// StartPromServer listens for prometheus resource usage requests
func StartPromServer(tlsConfig *api.PromTLSConf, server *http.Server, panicOnError bool) {
func StartPromServer(tlsConfig *api.PromTLSConf, server *http.Server, panicOnError bool, reg *prometheus.Registry) {
logrus.Debugf("entering StartPromServer")

// The Handler function provides a default handler to expose metrics
// via an HTTP server. "/metrics" is the usual endpoint for that.
http.Handle("/metrics", promhttp.Handler())
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
server.Handler = mux

var err error
if tlsConfig != nil {
Expand Down
Loading