From 86379eb41cddc69ac9ef5adc44332294d8327680 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Thu, 16 Nov 2023 14:58:26 +0100 Subject: [PATCH] Allow to start FLP directly from the flow logs producer - New "InProcess" ingest stage - New entry point to start the whole FLP in-process: pipeline.StartFLPInProcess - Add some tests --- cmd/flowlogs-pipeline/main.go | 26 +---- pkg/config/pipeline_builder.go | 11 +++ pkg/pipeline/encode/encode_prom.go | 15 +-- pkg/pipeline/ingest/ingest_inprocess.go | 42 ++++++++ pkg/pipeline/inprocess.go | 32 ++++++ pkg/pipeline/inprocess_test.go | 125 ++++++++++++++++++++++++ pkg/pipeline/pipeline.go | 23 +++-- pkg/pipeline/pipeline_builder.go | 21 +++- pkg/pipeline/utils/prom_server.go | 52 ---------- pkg/prometheus/prom_server.go | 91 +++++++++++++++++ 10 files changed, 339 insertions(+), 99 deletions(-) create mode 100644 pkg/pipeline/ingest/ingest_inprocess.go create mode 100644 pkg/pipeline/inprocess.go create mode 100644 pkg/pipeline/inprocess_test.go delete mode 100644 pkg/pipeline/utils/prom_server.go create mode 100644 pkg/prometheus/prom_server.go diff --git a/cmd/flowlogs-pipeline/main.go b/cmd/flowlogs-pipeline/main.go index eb618bb63..4e5382cc0 100644 --- a/cmd/flowlogs-pipeline/main.go +++ b/cmd/flowlogs-pipeline/main.go @@ -19,7 +19,6 @@ package main import ( "context" - "crypto/tls" "encoding/json" "fmt" "net/http" @@ -35,7 +34,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" - "github.com/prometheus/client_golang/prometheus" + "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -182,27 +181,8 @@ func run() { // Setup (threads) exit manager utils.SetupElegantExit() - - // set up private prometheus registry - if cfg.MetricsSettings.SuppressGoMetrics { - reg := prometheus.NewRegistry() - prometheus.DefaultRegisterer = reg - prometheus.DefaultGatherer = reg - } - - // create prometheus server for operational metrics - // if value of address is empty, then by default it will take 0.0.0.0 - addr := fmt.Sprintf("%s:%v", cfg.MetricsSettings.Address, cfg.MetricsSettings.Port) - log.Infof("startServer: addr = %s", addr) - promServer := &http.Server{ - Addr: addr, - // TLS clients must use TLS 1.2 or higher - TLSConfig: &tls.Config{ - MinVersion: tls.VersionTLS12, - }, - } - tlsConfig := cfg.MetricsSettings.TLS - go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic, prometheus.DefaultGatherer.(*prometheus.Registry)) + prometheus.SetGlobalMetricsSettings(&cfg.MetricsSettings) + promServer := prometheus.StartServerAsync(&cfg.MetricsSettings.PromConnectionInfo, nil) // Create new flows pipeline mainPipeline, err = pipeline.NewPipeline(&cfg) diff --git a/pkg/config/pipeline_builder.go b/pkg/config/pipeline_builder.go index f52dbc95e..1ed13b104 100644 --- a/pkg/config/pipeline_builder.go +++ b/pkg/config/pipeline_builder.go @@ -48,6 +48,8 @@ type PipelineBuilderStage struct { pipeline *pipeline } +const PresetIngesterStage = "preset-ingester" + // NewPipeline creates a new pipeline from an existing ingest func NewPipeline(name string, ingest *Ingest) (PipelineBuilderStage, error) { if ingest.Collector != nil { @@ -89,6 +91,15 @@ func NewKafkaPipeline(name string, ingest api.IngestKafka) PipelineBuilderStage return PipelineBuilderStage{pipeline: &p, lastStage: name} } +// NewPresetIngesterPipeline creates a new partial pipeline without ingest stage +func NewPresetIngesterPipeline() PipelineBuilderStage { + p := pipeline{ + stages: []Stage{}, + config: []StageParam{}, + } + return PipelineBuilderStage{pipeline: &p, lastStage: PresetIngesterStage} +} + func (b *PipelineBuilderStage) next(name string, param StageParam) PipelineBuilderStage { b.pipeline.stages = append(b.pipeline.stages, Stage{Name: name, Follows: b.lastStage}) b.pipeline.config = append(b.pipeline.config, param) diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index f4812140b..a33886633 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -18,9 +18,7 @@ package encode import ( - "crypto/tls" "fmt" - "net/http" "strings" "time" @@ -28,6 +26,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" "github.com/netobserv/flowlogs-pipeline/pkg/utils" "github.com/prometheus/client_golang/prometheus" @@ -279,17 +278,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En if cfg.PromConnectionInfo != nil { registry := prometheus.NewRegistry() registerer = registry - addr := fmt.Sprintf("%s:%v", cfg.PromConnectionInfo.Address, cfg.PromConnectionInfo.Port) - log.Infof("startServer: addr = %s", addr) - promServer := &http.Server{ - Addr: addr, - // TLS clients must use TLS 1.2 or higher - TLSConfig: &tls.Config{ - MinVersion: tls.VersionTLS12, - }, - } - tlsConfig := cfg.PromConnectionInfo.TLS - go putils.StartPromServer(tlsConfig, promServer, true, registry) + promserver.StartServerAsync(cfg.PromConnectionInfo, nil) } else { registerer = prometheus.DefaultRegisterer } diff --git a/pkg/pipeline/ingest/ingest_inprocess.go b/pkg/pipeline/ingest/ingest_inprocess.go new file mode 100644 index 000000000..44d15652a --- /dev/null +++ b/pkg/pipeline/ingest/ingest_inprocess.go @@ -0,0 +1,42 @@ +package ingest + +import ( + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" + + "github.com/netobserv/netobserv-ebpf-agent/pkg/decode" + "github.com/sirupsen/logrus" +) + +var ilog = logrus.WithField("component", "ingest.InProcess") + +// InProcess ingester, meant to be imported and used from another program via +type InProcess struct { + flowPackets chan *pbflow.Records +} + +func NewInProcess(flowPackets chan *pbflow.Records) *InProcess { + return &InProcess{flowPackets: flowPackets} +} + +func (d *InProcess) Ingest(out chan<- config.GenericMap) { + go func() { + <-utils.ExitChannel() + d.Close() + }() + for fp := range d.flowPackets { + ilog.Debugf("Ingested %v records", len(fp.Entries)) + for _, entry := range fp.Entries { + out <- decode.PBFlowToMap(entry) + } + } +} + +func (d *InProcess) Write(record *pbflow.Records) { + d.flowPackets <- record +} + +func (d *InProcess) Close() { + close(d.flowPackets) +} diff --git a/pkg/pipeline/inprocess.go b/pkg/pipeline/inprocess.go new file mode 100644 index 000000000..aec30462c --- /dev/null +++ b/pkg/pipeline/inprocess.go @@ -0,0 +1,32 @@ +package pipeline + +import ( + "context" + "fmt" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" + "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" +) + +// StartFLPInProcess is an entry point to start the whole FLP / pipeline processing from imported code +func StartFLPInProcess(cfg config.ConfigFileStruct) (*ingest.InProcess, error) { + prometheus.SetGlobalMetricsSettings(&cfg.MetricsSettings) + promServer := prometheus.StartServerAsync(&cfg.MetricsSettings.PromConnectionInfo, nil) + + // Create new flows pipeline + ingester := ingest.NewInProcess(make(chan *pbflow.Records, 100)) + flp, err := newPipelineFromIngester(&cfg, ingester) + if err != nil { + return nil, fmt.Errorf("failed to initialize pipeline %w", err) + } + + // Starts the flows pipeline; blocking call + go func() { + flp.Run() + _ = promServer.Shutdown(context.Background()) + }() + + return ingester, nil +} diff --git a/pkg/pipeline/inprocess_test.go b/pkg/pipeline/inprocess_test.go new file mode 100644 index 000000000..69603ae4f --- /dev/null +++ b/pkg/pipeline/inprocess_test.go @@ -0,0 +1,125 @@ +package pipeline + +import ( + "bufio" + "encoding/json" + "os" + "testing" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestInProcessFLP(t *testing.T) { + pipeline := config.NewPresetIngesterPipeline() + pipeline = pipeline.WriteStdout("writer", api.WriteStdout{Format: "json"}) + cfs := config.ConfigFileStruct{ + Pipeline: pipeline.GetStages(), + Parameters: pipeline.GetStageParams(), + } + ingester, err := StartFLPInProcess(cfs) + require.NoError(t, err) + defer ingester.Close() + + capturedOut, w, _ := os.Pipe() + old := os.Stdout + os.Stdout = w + defer func() { + os.Stdout = old + }() + + // yield thread to allow pipe services correctly start + time.Sleep(10 * time.Millisecond) + + startTime := time.Now() + endTime := startTime.Add(7 * time.Second) + someDuration := endTime.Sub(startTime) + + ingester.Write(&pbflow.Records{ + Entries: []*pbflow.Record{{ + Interface: "eth0", + EthProtocol: 2048, + Bytes: 456, + Packets: 123, + Direction: pbflow.Direction_EGRESS, + TimeFlowStart: timestamppb.New(startTime), + TimeFlowEnd: timestamppb.New(endTime), + Network: &pbflow.Network{ + SrcAddr: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304}, + }, + DstAddr: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708}, + }, + Dscp: 1, + }, + DataLink: &pbflow.DataLink{ + DstMac: 0x112233445566, + SrcMac: 0x010203040506, + }, + Transport: &pbflow.Transport{ + Protocol: 17, + SrcPort: 23000, + DstPort: 443, + }, + AgentIp: &pbflow.IP{ + IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x0a0b0c0d}, + }, + PktDropBytes: 100, + PktDropPackets: 10, + PktDropLatestFlags: 1, + PktDropLatestState: 1, + PktDropLatestDropCause: 8, + DnsLatency: durationpb.New(someDuration), + DnsId: 1, + DnsFlags: 0x80, + DnsErrno: 0, + TimeFlowRtt: durationpb.New(someDuration), + }}, + }) + + scanner := bufio.NewScanner(capturedOut) + require.True(t, scanner.Scan()) + capturedRecord := map[string]interface{}{} + bytes := scanner.Bytes() + require.NoError(t, json.Unmarshal(bytes, &capturedRecord), string(bytes)) + + assert.NotZero(t, capturedRecord["TimeReceived"]) + delete(capturedRecord, "TimeReceived") + assert.EqualValues(t, map[string]interface{}{ + "FlowDirection": float64(1), + "Bytes": float64(456), + "SrcAddr": "1.2.3.4", + "DstAddr": "5.6.7.8", + "Dscp": float64(1), + "DstMac": "11:22:33:44:55:66", + "SrcMac": "01:02:03:04:05:06", + "SrcPort": float64(23000), + "DstPort": float64(443), + "Duplicate": false, + "Etype": float64(2048), + "Packets": float64(123), + "Proto": float64(17), + "TimeFlowStartMs": float64(startTime.UnixMilli()), + "TimeFlowEndMs": float64(endTime.UnixMilli()), + "Interface": "eth0", + "AgentIP": "10.11.12.13", + "PktDropBytes": float64(100), + "PktDropPackets": float64(10), + "PktDropLatestFlags": float64(1), + "PktDropLatestState": "TCP_ESTABLISHED", + "PktDropLatestDropCause": "SKB_DROP_REASON_NETFILTER_DROP", + "DnsLatencyMs": float64(someDuration.Milliseconds()), + "DnsId": float64(1), + "DnsFlags": float64(0x80), + "DnsErrno": float64(0), + "DnsFlagsResponseCode": "NoError", + "TimeFlowRttNs": float64(someDuration.Nanoseconds()), + }, capturedRecord) +} diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index e9e923116..12cadfce5 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -22,6 +22,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" "github.com/netobserv/gopipes/pkg/node" log "github.com/sirupsen/logrus" ) @@ -48,18 +49,24 @@ type Pipeline struct { // NewPipeline defines the pipeline elements func NewPipeline(cfg *config.ConfigFileStruct) (*Pipeline, error) { - log.Debugf("entering NewPipeline") + return newPipelineFromIngester(cfg, nil) +} + +// newPipelineFromIngester defines the pipeline elements from a preset ingester (e.g. for in-process receiver) +func newPipelineFromIngester(cfg *config.ConfigFileStruct, ing ingest.Ingester) (*Pipeline, error) { + log.Debugf("entering newPipelineFromIngester") - stages := cfg.Pipeline - log.Debugf("stages = %v ", stages) - configParams := cfg.Parameters - log.Debugf("configParams = %v ", configParams) + log.Debugf("stages = %v ", cfg.Pipeline) + log.Debugf("configParams = %v ", cfg.Parameters) - build := newBuilder(cfg) - if err := build.readStages(); err != nil { + builder := newBuilder(cfg) + if ing != nil { + builder.presetIngester(ing) + } + if err := builder.readStages(); err != nil { return nil, err } - return build.build() + return builder.build() } func (p *Pipeline) Run() { diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index ac752e645..a3141a6f2 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -98,6 +98,17 @@ func newBuilder(cfg *config.ConfigFileStruct) *builder { } } +// use a preset ingester +func (b *builder) presetIngester(ing ingest.Ingester) { + name := config.PresetIngesterStage + log.Debugf("stage = %v", name) + b.appendEntry(pipelineEntry{ + stageName: name, + stageType: StageIngest, + Ingester: ing, + }) +} + // read the configuration stages definition and instantiate the corresponding native Go objects func (b *builder) readStages() error { for _, param := range b.configParams { @@ -124,14 +135,18 @@ func (b *builder) readStages() error { if err != nil { return err } - b.pipelineEntryMap[param.Name] = &pEntry - b.pipelineStages = append(b.pipelineStages, &pEntry) - log.Debugf("pipeline = %v", b.pipelineStages) + b.appendEntry(pEntry) } log.Debugf("pipeline = %v", b.pipelineStages) return nil } +func (b *builder) appendEntry(pEntry pipelineEntry) { + b.pipelineEntryMap[pEntry.stageName] = &pEntry + b.pipelineStages = append(b.pipelineStages, &pEntry) + log.Debugf("pipeline = %v", b.pipelineStages) +} + // reads the configured Go stages and connects between them // readStages must be invoked before this func (b *builder) build() (*Pipeline, error) { diff --git a/pkg/pipeline/utils/prom_server.go b/pkg/pipeline/utils/prom_server.go deleted file mode 100644 index 17d0e8c7d..000000000 --- a/pkg/pipeline/utils/prom_server.go +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (C) 2023 IBM, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package utils - -import ( - "net/http" - "os" - - "github.com/netobserv/flowlogs-pipeline/pkg/api" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/sirupsen/logrus" -) - -// StartPromServer listens for prometheus resource usage requests -func StartPromServer(tlsConfig *api.PromTLSConf, server *http.Server, panicOnError bool, reg *prometheus.Registry) { - logrus.Debugf("entering StartPromServer") - - // The Handler function provides a default handler to expose metrics - // via an HTTP server. "/metrics" is the usual endpoint for that. - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) - server.Handler = mux - - var err error - if tlsConfig != nil { - err = server.ListenAndServeTLS(tlsConfig.CertPath, tlsConfig.KeyPath) - } else { - err = server.ListenAndServe() - } - if err != nil && err != http.ErrServerClosed { - logrus.Errorf("error in http.ListenAndServe: %v", err) - if panicOnError { - os.Exit(1) - } - } -} diff --git a/pkg/prometheus/prom_server.go b/pkg/prometheus/prom_server.go new file mode 100644 index 000000000..9bde92b89 --- /dev/null +++ b/pkg/prometheus/prom_server.go @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2023 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package prometheus + +import ( + "crypto/tls" + "fmt" + "net/http" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +var ( + plog = logrus.WithField("component", "prometheus") + maybePanic = plog.Fatalf +) + +// SetGlobalMetricsSettings initializes some global settings used later when starting server(s) +func SetGlobalMetricsSettings(settings *config.MetricsSettings) { + if settings.SuppressGoMetrics { + // set up private prometheus registry + r := prom.NewRegistry() + prom.DefaultRegisterer = r + prom.DefaultGatherer = r + } + if settings.NoPanic { + maybePanic = plog.Errorf + } +} + +// StartServerAsync listens for prometheus resource usage requests +func StartServerAsync(conn *api.PromConnectionInfo, registry *prom.Registry) *http.Server { + // create prometheus server for operational metrics + // if value of address is empty, then by default it will take 0.0.0.0 + port := conn.Port + if port == 0 { + port = 9090 + } + addr := fmt.Sprintf("%s:%v", conn.Address, port) + plog.Infof("StartServerAsync: addr = %s", addr) + + httpServer := http.Server{ + Addr: addr, + // TLS clients must use TLS 1.2 or higher + TLSConfig: &tls.Config{ + MinVersion: tls.VersionTLS12, + }, + } + // The Handler function provides a default handler to expose metrics + // via an HTTP server. "/metrics" is the usual endpoint for that. + mux := http.NewServeMux() + if registry == nil { + mux.Handle("/metrics", promhttp.Handler()) + } else { + mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) + } + httpServer.Handler = mux + + go func() { + var err error + if conn.TLS != nil { + err = httpServer.ListenAndServeTLS(conn.TLS.CertPath, conn.TLS.KeyPath) + } else { + err = httpServer.ListenAndServe() + } + if err != nil && err != http.ErrServerClosed { + maybePanic("error in http.ListenAndServe: %v", err) + } + }() + + return &httpServer +}