diff --git a/cmd/flowlogs-pipeline/main.go b/cmd/flowlogs-pipeline/main.go index c2e1fac9e..16b7b7c88 100644 --- a/cmd/flowlogs-pipeline/main.go +++ b/cmd/flowlogs-pipeline/main.go @@ -204,7 +204,7 @@ func run() { // Create new flows pipeline mainPipeline, err = pipeline.NewPipeline(&cfg) if err != nil { - log.Fatalf("failed to initialize pipeline %s", err) + log.Fatalf("failed to initialize pipeline: %s", err) os.Exit(1) } diff --git a/docs/api.md b/docs/api.md index 34f838586..f9c7f3401 100644 --- a/docs/api.md +++ b/docs/api.md @@ -111,6 +111,12 @@ Following is the supported API format for the Network Observability eBPF ingest: port: the port number to listen on bufferLength: the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100) +## Ingest Standard Input +Following is the supported API format for the standard input ingest: + +
+ stdin: +## Transform Generic API Following is the supported API format for generic transformations: diff --git a/pkg/api/api.go b/pkg/api/api.go index a49f1e04c..7b441d447 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -23,6 +23,7 @@ const ( FileChunksType = "file_chunks" SyntheticType = "synthetic" CollectorType = "collector" + StdinType = "stdin" GRPCType = "grpc" FakeType = "fake" KafkaType = "kafka" @@ -60,6 +61,7 @@ type API struct { IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"` IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"` IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"` + IngestStdin IngestStdin `yaml:"stdin" doc:"## Ingest Standard Input\nFollowing is the supported API format for the standard input ingest:\n"` TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"` TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"` TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"` diff --git a/pkg/api/ingest_stdin.go b/pkg/api/ingest_stdin.go new file mode 100644 index 000000000..a1df3e817 --- /dev/null +++ b/pkg/api/ingest_stdin.go @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2023 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +type IngestStdin struct { +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 6f01e56a3..d68139a5e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -91,6 +91,7 @@ type Ingest struct { Kafka *api.IngestKafka `yaml:"kafka,omitempty" json:"kafka,omitempty"` GRPC *api.IngestGRPCProto `yaml:"grpc,omitempty" json:"grpc,omitempty"` Synthetic *api.IngestSynthetic `yaml:"synthetic,omitempty" json:"synthetic,omitempty"` + Stdin *api.IngestStdin `yaml:"stdin,omitempty" json:"stdin,omitempty"` } type File struct { diff --git a/pkg/pipeline/ingest/ingest_stdin.go b/pkg/pipeline/ingest/ingest_stdin.go new file mode 100644 index 000000000..1aa248056 --- /dev/null +++ b/pkg/pipeline/ingest/ingest_stdin.go @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2023 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ingest + +import ( + "bufio" + "os" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode" + pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + "github.com/sirupsen/logrus" +) + +const ( + stdinChannelSize = 1000 +) + +var slog = logrus.WithField("component", "ingest.Stdin") + +type ingestStdin struct { + in chan string + eof chan struct{} + exitChan <-chan struct{} + metrics *metrics + decoder decode.Decoder +} + +// Ingest ingests entries from stdin +func (s *ingestStdin) Ingest(out chan<- config.GenericMap) { + slog.Debugf("entering ingestStdin.Ingest") + s.metrics.createOutQueueLen(out) + + go s.getStdinInput() + + // process log lines received by stdin + s.processLogLines(out) +} + +func (s *ingestStdin) getStdinInput() { + scanner := bufio.NewScanner(os.Stdin) + // Loop to read lines from stdin until an error or EOF is encountered + for scanner.Scan() { + s.in <- scanner.Text() + } + + // Check for errors + if err := scanner.Err(); err != nil { + slog.WithError(err).Errorf("Error reading standard input") + } + close(s.eof) +} + +func (s *ingestStdin) processLogLines(out chan<- config.GenericMap) { + for { + select { + case <-s.exitChan: + slog.Debugf("exiting ingestStdin because of signal") + return + case <-s.eof: + slog.Debugf("exiting ingestStdin because of EOF") + return + case line := <-s.in: + s.processRecord(out, line) + } + } +} + +func (s *ingestStdin) processRecord(out chan<- config.GenericMap, line string) { + slog.Debugf("Decoding %s", line) + decoded, err := s.decoder.Decode([]byte(line)) + if err != nil { + slog.WithError(err).Warnf("ignoring line %v", line) + s.metrics.error("Ignoring line") + return + } + s.metrics.flowsProcessed.Inc() + out <- decoded +} + +// NewIngestStdin create a new ingester +func NewIngestStdin(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) { + slog.Debugf("Entering NewIngestStdin") + + in := make(chan string, stdinChannelSize) + eof := make(chan struct{}) + metrics := newMetrics(opMetrics, params.Name, params.Ingest.Type, func() int { return len(in) }) + decoderParams := api.Decoder{Type: api.DecoderName("JSON")} + decoder, err := decode.GetDecoder(decoderParams) + if err != nil { + return nil, err + } + + return &ingestStdin{ + exitChan: pUtils.ExitChannel(), + in: in, + eof: eof, + metrics: metrics, + decoder: decoder, + }, nil +} diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index 1d9fff717..ac752e645 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -334,6 +334,8 @@ func getIngester(opMetrics *operational.Metrics, params config.StageParam) (inge ingester, err = ingest.NewIngestSynthetic(opMetrics, params) case api.CollectorType: ingester, err = ingest.NewIngestCollector(opMetrics, params) + case api.StdinType: + ingester, err = ingest.NewIngestStdin(opMetrics, params) case api.KafkaType: ingester, err = ingest.NewIngestKafka(opMetrics, params) case api.GRPCType: