Skip to content

Commit

Permalink
Allow to start FLP directly from the flow logs producer (#538)
Browse files Browse the repository at this point in the history
* Allow to start FLP directly from the flow logs producer

- New "InProcess" ingest stage
- New entry point to start the whole FLP in-process: pipeline.StartFLPInProcess
- Add some tests

* Add builder functions ToConfig / IntoConfig

* Allow disabling operational metrics

* Add log when prom disabled

* InProcess ingest now takes in channel as argument, directly with the
GenericMap
  • Loading branch information
jotak authored Nov 23, 2023
1 parent 7c78d63 commit f3b03fa
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 108 deletions.
29 changes: 5 additions & 24 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -35,7 +34,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -182,27 +181,7 @@ func run() {

// Setup (threads) exit manager
utils.SetupElegantExit()

// set up private prometheus registry
if cfg.MetricsSettings.SuppressGoMetrics {
reg := prometheus.NewRegistry()
prometheus.DefaultRegisterer = reg
prometheus.DefaultGatherer = reg
}

// create prometheus server for operational metrics
// if value of address is empty, then by default it will take 0.0.0.0
addr := fmt.Sprintf("%s:%v", cfg.MetricsSettings.Address, cfg.MetricsSettings.Port)
log.Infof("startServer: addr = %s", addr)
promServer := &http.Server{
Addr: addr,
// TLS clients must use TLS 1.2 or higher
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
tlsConfig := cfg.MetricsSettings.TLS
go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic, prometheus.DefaultGatherer.(*prometheus.Registry))
promServer := prometheus.InitializePrometheus(&cfg.MetricsSettings)

// Create new flows pipeline
mainPipeline, err = pipeline.NewPipeline(&cfg)
Expand All @@ -225,7 +204,9 @@ func run() {
// Starts the flows pipeline
mainPipeline.Run()

_ = promServer.Shutdown(context.Background())
if promServer != nil {
_ = promServer.Shutdown(context.Background())
}

// Give all threads a chance to exit and then exit the process
time.Sleep(time.Second)
Expand Down
8 changes: 3 additions & 5 deletions pkg/confgen/flowlogs2metrics_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,13 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct {
if cg.config.Write.Loki != nil {
forkedNode.WriteLoki("write_loki", *cg.config.Write.Loki)
}
return &config.ConfigFileStruct{
LogLevel: "error",
Pipeline: pipeline.GetStages(),
Parameters: pipeline.GetStageParams(),
return pipeline.IntoConfigFileStruct(&config.ConfigFileStruct{
LogLevel: "error",
MetricsSettings: config.MetricsSettings{
PromConnectionInfo: api.PromConnectionInfo{Port: 9102},
Prefix: "flp_op_",
},
}
})
}

func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam {
Expand Down
7 changes: 4 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ type Profile struct {
// will help configuring common setting for all PromEncode stages - PromEncode settings would then act as overrides.
type MetricsSettings struct {
api.PromConnectionInfo
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"`
NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"`
SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"`
DisableGlobalServer bool `yaml:"disableGlobalServer,omitempty" json:"disableGlobalServer,omitempty" doc:"disabling the global metrics server makes operational metrics unavailable. If prometheus-encoding stages are defined, they need to contain their own metrics server parameters."`
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix for names of the operational metrics"`
NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"`
SuppressGoMetrics bool `yaml:"suppressGoMetrics,omitempty" json:"suppressGoMetrics,omitempty" doc:"filter out Go and process metrics"`
}

// PerfSettings allows setting some internal configuration parameters
Expand Down
23 changes: 23 additions & 0 deletions pkg/config/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type PipelineBuilderStage struct {
pipeline *pipeline
}

const PresetIngesterStage = "preset-ingester"

// NewPipeline creates a new pipeline from an existing ingest
func NewPipeline(name string, ingest *Ingest) (PipelineBuilderStage, error) {
if ingest.Collector != nil {
Expand Down Expand Up @@ -89,6 +91,15 @@ func NewKafkaPipeline(name string, ingest api.IngestKafka) PipelineBuilderStage
return PipelineBuilderStage{pipeline: &p, lastStage: name}
}

// NewPresetIngesterPipeline creates a new partial pipeline without ingest stage
func NewPresetIngesterPipeline() PipelineBuilderStage {
p := pipeline{
stages: []Stage{},
config: []StageParam{},
}
return PipelineBuilderStage{pipeline: &p, lastStage: PresetIngesterStage}
}

func (b *PipelineBuilderStage) next(name string, param StageParam) PipelineBuilderStage {
b.pipeline.stages = append(b.pipeline.stages, Stage{Name: name, Follows: b.lastStage})
b.pipeline.config = append(b.pipeline.config, param)
Expand Down Expand Up @@ -164,3 +175,15 @@ func (b *PipelineBuilderStage) GetStages() []Stage {
func (b *PipelineBuilderStage) GetStageParams() []StageParam {
return b.pipeline.config
}

// IntoConfigFileStruct injects the current pipeline and params in the provided ConfigFileStruct object.
func (b *PipelineBuilderStage) IntoConfigFileStruct(cfs *ConfigFileStruct) *ConfigFileStruct {
cfs.Pipeline = b.GetStages()
cfs.Parameters = b.GetStageParams()
return cfs
}

// ToConfigFileStruct returns the current pipeline and params as a new ConfigFileStruct object.
func (b *PipelineBuilderStage) ToConfigFileStruct() *ConfigFileStruct {
return b.IntoConfigFileStruct(&ConfigFileStruct{})
}
15 changes: 2 additions & 13 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
package encode

import (
"crypto/tls"
"fmt"
"net/http"
"strings"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -279,17 +278,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
if cfg.PromConnectionInfo != nil {
registry := prometheus.NewRegistry()
registerer = registry
addr := fmt.Sprintf("%s:%v", cfg.PromConnectionInfo.Address, cfg.PromConnectionInfo.Port)
log.Infof("startServer: addr = %s", addr)
promServer := &http.Server{
Addr: addr,
// TLS clients must use TLS 1.2 or higher
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
tlsConfig := cfg.PromConnectionInfo.TLS
go putils.StartPromServer(tlsConfig, promServer, true, registry)
promserver.StartServerAsync(cfg.PromConnectionInfo, nil)
} else {
registerer = prometheus.DefaultRegisterer
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/pipeline/ingest/ingest_inprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ingest

import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
)

// InProcess ingester is meant to be imported and used from another program
// via pipeline.StartFLPInProcess
type InProcess struct {
in chan config.GenericMap
}

func NewInProcess(in chan config.GenericMap) *InProcess {
return &InProcess{in: in}
}

func (d *InProcess) Ingest(out chan<- config.GenericMap) {
go func() {
<-utils.ExitChannel()
d.Close()
}()
for rec := range d.in {
out <- rec
}
}

func (d *InProcess) Close() {
}
32 changes: 32 additions & 0 deletions pkg/pipeline/inprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pipeline

import (
"context"
"fmt"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest"
"github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
)

// StartFLPInProcess is an entry point to start the whole FLP / pipeline processing from imported code
func StartFLPInProcess(cfg *config.ConfigFileStruct, in chan config.GenericMap) error {
promServer := prometheus.InitializePrometheus(&cfg.MetricsSettings)

// Create new flows pipeline
ingester := ingest.NewInProcess(in)
flp, err := newPipelineFromIngester(cfg, ingester)
if err != nil {
return fmt.Errorf("failed to initialize pipeline %w", err)
}

// Starts the flows pipeline; blocking call
go func() {
flp.Run()
if promServer != nil {
_ = promServer.Shutdown(context.Background())
}
}()

return nil
}
55 changes: 55 additions & 0 deletions pkg/pipeline/inprocess_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package pipeline

import (
"bufio"
"encoding/json"
"os"
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestInProcessFLP(t *testing.T) {
pipeline := config.NewPresetIngesterPipeline()
pipeline = pipeline.WriteStdout("writer", api.WriteStdout{Format: "json"})
in := make(chan config.GenericMap, 100)
defer close(in)
err := StartFLPInProcess(pipeline.ToConfigFileStruct(), in)
require.NoError(t, err)

capturedOut, w, _ := os.Pipe()
old := os.Stdout
os.Stdout = w
defer func() {
os.Stdout = old
}()

// yield thread to allow pipe services correctly start
time.Sleep(10 * time.Millisecond)

in <- config.GenericMap{
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": float64(1),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
}

scanner := bufio.NewScanner(capturedOut)
require.True(t, scanner.Scan())
capturedRecord := map[string]interface{}{}
bytes := scanner.Bytes()
require.NoError(t, json.Unmarshal(bytes, &capturedRecord), string(bytes))

assert.EqualValues(t, map[string]interface{}{
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": float64(1),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
}, capturedRecord)
}
23 changes: 15 additions & 8 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest"
"github.com/netobserv/gopipes/pkg/node"
log "github.com/sirupsen/logrus"
)
Expand All @@ -48,18 +49,24 @@ type Pipeline struct {

// NewPipeline defines the pipeline elements
func NewPipeline(cfg *config.ConfigFileStruct) (*Pipeline, error) {
log.Debugf("entering NewPipeline")
return newPipelineFromIngester(cfg, nil)
}

// newPipelineFromIngester defines the pipeline elements from a preset ingester (e.g. for in-process receiver)
func newPipelineFromIngester(cfg *config.ConfigFileStruct, ing ingest.Ingester) (*Pipeline, error) {
log.Debugf("entering newPipelineFromIngester")

stages := cfg.Pipeline
log.Debugf("stages = %v ", stages)
configParams := cfg.Parameters
log.Debugf("configParams = %v ", configParams)
log.Debugf("stages = %v ", cfg.Pipeline)
log.Debugf("configParams = %v ", cfg.Parameters)

build := newBuilder(cfg)
if err := build.readStages(); err != nil {
builder := newBuilder(cfg)
if ing != nil {
builder.presetIngester(ing)
}
if err := builder.readStages(); err != nil {
return nil, err
}
return build.build()
return builder.build()
}

func (p *Pipeline) Run() {
Expand Down
21 changes: 18 additions & 3 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ func newBuilder(cfg *config.ConfigFileStruct) *builder {
}
}

// use a preset ingester
func (b *builder) presetIngester(ing ingest.Ingester) {
name := config.PresetIngesterStage
log.Debugf("stage = %v", name)
b.appendEntry(pipelineEntry{
stageName: name,
stageType: StageIngest,
Ingester: ing,
})
}

// read the configuration stages definition and instantiate the corresponding native Go objects
func (b *builder) readStages() error {
for _, param := range b.configParams {
Expand All @@ -124,14 +135,18 @@ func (b *builder) readStages() error {
if err != nil {
return err
}
b.pipelineEntryMap[param.Name] = &pEntry
b.pipelineStages = append(b.pipelineStages, &pEntry)
log.Debugf("pipeline = %v", b.pipelineStages)
b.appendEntry(pEntry)
}
log.Debugf("pipeline = %v", b.pipelineStages)
return nil
}

func (b *builder) appendEntry(pEntry pipelineEntry) {
b.pipelineEntryMap[pEntry.stageName] = &pEntry
b.pipelineStages = append(b.pipelineStages, &pEntry)
log.Debugf("pipeline = %v", b.pipelineStages)
}

// reads the configured Go stages and connects between them
// readStages must be invoked before this
func (b *builder) build() (*Pipeline, error) {
Expand Down
Loading

0 comments on commit f3b03fa

Please sign in to comment.