Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingest_stdin #488

Merged
merged 3 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func run() {
// Create new flows pipeline
mainPipeline, err = pipeline.NewPipeline(&cfg)
if err != nil {
log.Fatalf("failed to initialize pipeline %s", err)
log.Fatalf("failed to initialize pipeline: %s", err)
os.Exit(1)
}

Expand Down
6 changes: 6 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ Following is the supported API format for the Network Observability eBPF ingest:
port: the port number to listen on
bufferLength: the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)
</pre>
## Ingest Standard Input
Following is the supported API format for the standard input ingest:

<pre>
stdin:
</pre>
## Transform Generic API
Following is the supported API format for generic transformations:

Expand Down
2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
FileChunksType = "file_chunks"
SyntheticType = "synthetic"
CollectorType = "collector"
StdinType = "stdin"
GRPCType = "grpc"
FakeType = "fake"
KafkaType = "kafka"
Expand Down Expand Up @@ -60,6 +61,7 @@ type API struct {
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"`
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"`
IngestStdin IngestStdin `yaml:"stdin" doc:"## Ingest Standard Input\nFollowing is the supported API format for the standard input ingest:\n"`
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
Expand Down
21 changes: 21 additions & 0 deletions pkg/api/ingest_stdin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api

type IngestStdin struct {
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type Ingest struct {
Kafka *api.IngestKafka `yaml:"kafka,omitempty" json:"kafka,omitempty"`
GRPC *api.IngestGRPCProto `yaml:"grpc,omitempty" json:"grpc,omitempty"`
Synthetic *api.IngestSynthetic `yaml:"synthetic,omitempty" json:"synthetic,omitempty"`
Stdin *api.IngestStdin `yaml:"stdin,omitempty" json:"stdin,omitempty"`
}

type File struct {
Expand Down
118 changes: 118 additions & 0 deletions pkg/pipeline/ingest/ingest_stdin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ingest

import (
"bufio"
"os"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode"
pUtils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/sirupsen/logrus"
)

const (
stdinChannelSize = 1000
)

var slog = logrus.WithField("component", "ingest.Stdin")

type ingestStdin struct {
in chan string
eof chan struct{}
exitChan <-chan struct{}
metrics *metrics
decoder decode.Decoder
}

// Ingest ingests entries from stdin
func (s *ingestStdin) Ingest(out chan<- config.GenericMap) {
slog.Debugf("entering ingestStdin.Ingest")
s.metrics.createOutQueueLen(out)

go s.getStdinInput()

// process log lines received by stdin
s.processLogLines(out)
}

func (s *ingestStdin) getStdinInput() {
scanner := bufio.NewScanner(os.Stdin)
// Loop to read lines from stdin until an error or EOF is encountered
for scanner.Scan() {
s.in <- scanner.Text()
}

// Check for errors
if err := scanner.Err(); err != nil {
slog.WithError(err).Errorf("Error reading standard input")
}
close(s.eof)
}

func (s *ingestStdin) processLogLines(out chan<- config.GenericMap) {
for {
select {
case <-s.exitChan:
slog.Debugf("exiting ingestStdin because of signal")
return
case <-s.eof:
slog.Debugf("exiting ingestStdin because of EOF")
return
case line := <-s.in:
s.processRecord(out, line)
}
}
}

func (s *ingestStdin) processRecord(out chan<- config.GenericMap, line string) {
slog.Debugf("Decoding %s", line)
decoded, err := s.decoder.Decode([]byte(line))
if err != nil {
slog.WithError(err).Warnf("ignoring line %v", line)
s.metrics.error("Ignoring line")
return
}
s.metrics.flowsProcessed.Inc()
out <- decoded
}

// NewIngestStdin create a new ingester
func NewIngestStdin(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) {
slog.Debugf("Entering NewIngestStdin")

in := make(chan string, stdinChannelSize)
eof := make(chan struct{})
metrics := newMetrics(opMetrics, params.Name, params.Ingest.Type, func() int { return len(in) })
decoderParams := api.Decoder{Type: api.DecoderName("JSON")}
decoder, err := decode.GetDecoder(decoderParams)
if err != nil {
return nil, err
}

return &ingestStdin{
exitChan: pUtils.ExitChannel(),
in: in,
eof: eof,
metrics: metrics,
decoder: decoder,
}, nil
}
2 changes: 2 additions & 0 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ func getIngester(opMetrics *operational.Metrics, params config.StageParam) (inge
ingester, err = ingest.NewIngestSynthetic(opMetrics, params)
case api.CollectorType:
ingester, err = ingest.NewIngestCollector(opMetrics, params)
case api.StdinType:
ingester, err = ingest.NewIngestStdin(opMetrics, params)
case api.KafkaType:
ingester, err = ingest.NewIngestKafka(opMetrics, params)
case api.GRPCType:
Expand Down