-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ingest_stdin #488
Add ingest_stdin #488
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #488 +/- ##
==========================================
+ Coverage 66.03% 67.04% +1.01%
==========================================
Files 94 94
Lines 6921 6707 -214
==========================================
- Hits 4570 4497 -73
+ Misses 2092 1944 -148
- Partials 259 266 +7
Flags with carried forward coverage won't be shown. Click here to find out more.
☔ View full report in Codecov by Sentry. |
pkg/pipeline/ingest/ingest_stdin.go
Outdated
if params.Ingest == nil || params.Ingest.Stdin == nil || params.Ingest.Stdin.Decoder.Type == "" { | ||
return nil, fmt.Errorf("stdin decoder not specified") | ||
} | ||
decoderType := params.Ingest.Stdin.Decoder.Type | ||
if decoderType != api.DecoderName("JSON") { | ||
return nil, fmt.Errorf("stdin supports only json decoder. Given decoder type: %v", decoderType) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say, if only JSON is supported, maybe do not make it configurable? Like we're doing with grpc => it assumes protobuf.
Or if you really prefer to make it configurable, maybe it should assume json as a default when unset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I'll go with the first option.
pkg/pipeline/ingest/ingest_stdin.go
Outdated
func NewIngestStdin(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) { | ||
slog.Debugf("Entering NewIngestStdin") | ||
|
||
in := make(chan string, channelSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should channelSize
now be moved to a more central location (perhaps ingest.go)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I introduced a new constant stdinChannelSize
to decouple the stdin ingester from the other ingester.
/lgtm |
New changes are detected. LGTM label has been removed. |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
thanks @ronensc ! |
This PR adds a standard input ingester.
Currently it supports only json formats and assumes each line corresponds to a flowlog.
Closes #486.
Simple configuration to test it:
Run FLP:
and type:
This also allows Unix pipeline:
cc @aslom