Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP NETOBSERV-1471 add tcp write stage #604

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion contrib/kubernetes/flowlogs-pipeline.conf.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# This file was generated automatically by flowlogs-pipeline confgenerator
log-level: error
metricsSettings:
port: 9102
promconnectioninfo:
port: 9102
prefix: flp_op_
pipeline:
- name: ingest_collector
Expand Down Expand Up @@ -252,41 +253,49 @@ parameters:
encode:
type: prom
prom:
promconnectioninfo: null
metrics:
- name: bandwidth_per_network_service
type: counter
filters:
- key: name
value: bandwidth_network_service
type: ""
valueKey: recent_op_value
labels:
- groupByKeys
- aggregate
buckets: []
valueScale: 0
- name: bandwidth_per_source_destination_subnet
type: counter
filters:
- key: name
value: bandwidth_source_destination_subnet
type: ""
valueKey: recent_op_value
labels:
- groupByKeys
- aggregate
buckets: []
valueScale: 0
- name: bandwidth_per_source_subnet
type: gauge
filters:
- key: name
value: bandwidth_source_subnet
type: ""
valueKey: bytes
labels:
- srcSubnet
buckets: []
valueScale: 0
- name: connection_size_histogram
type: agg_histogram
filters:
- key: name
value: connection_bytes_hist
type: ""
valueKey: recent_raw_values
labels:
- groupByKeys
Expand All @@ -297,11 +306,13 @@ parameters:
- 10240
- 102400
- 1.048576e+06
valueScale: 0
- name: connection_size_histogram_ab
type: agg_histogram
filters:
- key: name
value: connection_bytes_hist_AB
type: ""
valueKey: recent_raw_values
labels:
- groupByKeys
Expand All @@ -312,11 +323,13 @@ parameters:
- 10240
- 102400
- 1.048576e+06
valueScale: 0
- name: connection_size_histogram_ba
type: agg_histogram
filters:
- key: name
value: connection_bytes_hist_BA
type: ""
valueKey: recent_raw_values
labels:
- groupByKeys
Expand All @@ -327,92 +340,110 @@ parameters:
- 10240
- 102400
- 1.048576e+06
valueScale: 0
- name: connections_per_destination_subnet
type: counter
filters:
- key: name
value: dest_connection_subnet_count
type: ""
valueKey: recent_count
labels:
- _RecordType
- dstSubnet
buckets: []
valueScale: 0
- name: connections_per_source_subnet
type: counter
filters:
- key: name
value: src_connection_count
type: ""
valueKey: recent_count
labels:
- srcSubnet
- _RecordType
buckets: []
valueScale: 0
- name: connections_per_tcp_flags
type: counter
filters:
- key: name
value: TCPFlags_count
type: ""
valueKey: recent_count
labels:
- groupByKeys
- aggregate
buckets: []
valueScale: 0
- name: connections_per_destination_as
type: counter
filters:
- key: name
value: dst_as_connection_count
type: ""
valueKey: recent_count
labels:
- dstAS
- _RecordType
buckets: []
valueScale: 0
- name: connections_per_source_as
type: counter
filters:
- key: name
value: src_as_connection_count
type: ""
valueKey: recent_count
labels:
- srcAS
- _RecordType
buckets: []
valueScale: 0
- name: count_per_source_destination_subnet
type: counter
filters:
- key: name
value: count_source_destination_subnet
type: ""
valueKey: recent_count
labels:
- dstSubnet24
- srcSubnet24
- _RecordType
buckets: []
valueScale: 0
- name: egress_per_destination_subnet
type: counter
filters:
- key: name
value: bandwidth_destination_subnet
type: ""
valueKey: recent_op_value
labels:
- groupByKeys
- aggregate
buckets: []
valueScale: 0
- name: egress_per_namespace
type: counter
filters:
- key: name
value: bandwidth_namespace
type: ""
valueKey: recent_op_value
labels:
- groupByKeys
- aggregate
buckets: []
valueScale: 0
- name: flows_length_histogram
type: agg_histogram
filters:
- key: name
value: flows_bytes_hist
type: ""
valueKey: recent_raw_values
labels:
- groupByKeys
Expand All @@ -423,26 +454,31 @@ parameters:
- 10240
- 102400
- 1.048576e+06
valueScale: 0
- name: connections_per_destination_location
type: counter
filters:
- key: name
value: dest_connection_location_count
type: ""
valueKey: recent_count
labels:
- dstLocation_CountryName
- _RecordType
buckets: []
valueScale: 0
- name: service_count
type: counter
filters:
- key: name
value: dest_service_count
type: ""
valueKey: recent_count
labels:
- service
- _RecordType
buckets: []
valueScale: 0
prefix: flp_
- name: write_loki
write:
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
OtlpLogsType = "otlplogs"
OtlpMetricsType = "otlpmetrics"
OtlpTracesType = "otlptraces"
TCPType = "tcp"
StdoutType = "stdout"
LokiType = "loki"
IpfixType = "ipfix"
Expand Down Expand Up @@ -74,6 +75,7 @@ type API struct {
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"`
WriteTCP WriteTCP `yaml:"tcp" doc:"## Write TCP\nFollowing is the supported API format for writing to tcp:\n"`
WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"`
ExtractAggregate Aggregates `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"`
ConnectionTracking ConnTrack `yaml:"conntrack" doc:"## Connection tracking API\nFollowing is the supported API format for specifying connection tracking:\n"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/api/write_tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package api

type WriteTCP struct {
Port string `yaml:"port,omitempty" json:"port,omitempty" doc:"TCP port number"`
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type Encode struct {

type Write struct {
Type string `yaml:"type" json:"type"`
TCP *api.WriteTCP `yaml:"tcp,omitempty" json:"tcp,omitempty"`
Loki *api.WriteLoki `yaml:"loki,omitempty" json:"loki,omitempty"`
Stdout *api.WriteStdout `yaml:"stdout,omitempty" json:"stdout,omitempty"`
Ipfix *api.WriteIpfix `yaml:"ipfix,omitempty" json:"ipfix,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ func (b *PipelineBuilderStage) EncodeS3(name string, s3 api.EncodeS3) PipelineBu
return b.next(name, NewEncodeS3Params(name, s3))
}

// WriteTCP chains the current stage with a WriteTCP stage and returns that new stage
func (b *PipelineBuilderStage) WriteTCP(name string, tcp api.WriteTCP) PipelineBuilderStage {
return b.next(name, NewWriteTCPParams(name, tcp))
}

// WriteStdout chains the current stage with a WriteStdout stage and returns that new stage
func (b *PipelineBuilderStage) WriteStdout(name string, stdout api.WriteStdout) PipelineBuilderStage {
return b.next(name, NewWriteStdoutParams(name, stdout))
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/stage_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func NewEncodeS3Params(name string, s3 api.EncodeS3) StageParam {
return StageParam{Name: name, Encode: &Encode{Type: api.S3Type, S3: &s3}}
}

func NewWriteTCPParams(name string, tcp api.WriteTCP) StageParam {
return StageParam{Name: name, Write: &Write{Type: api.TCPType, TCP: &tcp}}
}

func NewWriteStdoutParams(name string, stdout api.WriteStdout) StageParam {
return StageParam{Name: name, Write: &Write{Type: api.StdoutType, Stdout: &stdout}}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ func getWriter(opMetrics *operational.Metrics, params config.StageParam) (write.
var writer write.Writer
var err error
switch params.Write.Type {
case api.TCPType:
writer, err = write.NewWriteTCP(params)
case api.StdoutType:
writer, err = write.NewWriteStdout(params)
case api.NoneType:
Expand Down
69 changes: 69 additions & 0 deletions pkg/pipeline/write/write_tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (C) 2021 IBM, Inc.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix (or remove) Copyright statement

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is coming from both eBPF packets exporter and flp stdout write stage 😄

I just copied the code from there but I can remove the copyright if you want.

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package write

import (
"encoding/json"
"fmt"
"net"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/sirupsen/logrus"
)

type writeTCP struct {
address string
conn net.Conn
}

// Write writes a flow to tcp connection
func (t *writeTCP) Write(v config.GenericMap) {
logrus.Tracef("entering writeTCP Write")
b, _ := json.Marshal(v)
// append new line between each record to split on client side
b = append(b, []byte("\n")...)
_, err := t.conn.Write(b)
if err != nil {
log.WithError(err).Warn("can't write tcp")
}
}

// NewWriteTCP create a new write
func NewWriteTCP(params config.StageParam) (Writer, error) {
logrus.Debugf("entering NewWriteTCP")
writeTCP := &writeTCP{}
if params.Write != nil && params.Write.TCP != nil && params.Write.TCP.Port != "" {
writeTCP.address = ":" + params.Write.TCP.Port
} else {
return nil, fmt.Errorf("Write.TCP.Port must be specified")
}

logrus.Debugf("NewWriteTCP Listen %s", writeTCP.address)
l, err := net.Listen("tcp", writeTCP.address)
if err != nil {
return nil, err
}
defer l.Close()
clientConn, err := l.Accept()
Comment on lines +57 to +62
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you are expecting some other entity to initiate a TCP connection to FLP to which FLP will write its flow logs.
The code will block here until a single connection is completed. The rest of FLP will wait on this connection. Is this the desired behavior?

What is the use case you have in mind?

What if I have multiple targets to which I want to send my data via TCP? Can I set up multiple writeTCP stages? How will it work? or do we want to put the listen() operation into a separate thread and accumulate the connections that were attempted and send flow logs to all current connections?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a temporary PR used in netobserv CLI: netobserv/network-observability-cli#2 (comment)

The CLI deploys eBPF agents using direct-flp mode which embedd flowlogs-pipeline into the same pods.
Currently the connection between these pods and cli is made using TCP after calling oc port-forward commands.
Using gRPC instead is still in discussion.

If we keep that TCP implementation, we can indeed improve and makes the wait optionnal + allow multiple targets 👍

if err != nil {
return nil, err
}
writeTCP.conn = clientConn

return writeTCP, nil
}
Loading