Skip to content

Commit

Permalink
NETOBSERV-1471 add gRPC write stage (#621)
Browse files Browse the repository at this point in the history
* add grpc write stage

* add testing
  • Loading branch information
jpinsonneau authored Mar 12, 2024
1 parent 53e97a5 commit ddc1f67
Show file tree
Hide file tree
Showing 12 changed files with 673 additions and 0 deletions.
21 changes: 21 additions & 0 deletions pkg/api/write_grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package api

import "errors"

type WriteGRPC struct {
TargetHost string `yaml:"targetHost,omitempty" json:"targetHost,omitempty" doc:"the host name or IP of the target Flow collector"`
TargetPort int `yaml:"targetPort,omitempty" json:"targetPort,omitempty" doc:"the port of the target Flow collector"`
}

func (w *WriteGRPC) Validate() error {
if w == nil {
return errors.New("you must provide a configuration")
}
if w.TargetHost == "" {
return errors.New("targetHost can't be empty")
}
if w.TargetPort == 0 {
return errors.New("targetPort can't be empty")
}
return nil
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type Write struct {
Loki *api.WriteLoki `yaml:"loki,omitempty" json:"loki,omitempty"`
Stdout *api.WriteStdout `yaml:"stdout,omitempty" json:"stdout,omitempty"`
Ipfix *api.WriteIpfix `yaml:"ipfix,omitempty" json:"ipfix,omitempty"`
GRPC *api.WriteGRPC `yaml:"grpc,omitempty" json:"grpc,omitempty"`
}

// ParseConfig creates the internal unmarshalled representation from the Pipeline and Parameters json
Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ func getWriter(opMetrics *operational.Metrics, params config.StageParam) (write.
var writer write.Writer
var err error
switch params.Write.Type {
case api.GRPCType:
writer, err = write.NewWriteGRPC(params)
case api.StdoutType:
writer, err = write.NewWriteStdout(params)
case api.NoneType:
Expand Down
40 changes: 40 additions & 0 deletions pkg/pipeline/write/grpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package grpc

import (
"flag"
"log"

pb "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// ClientConnection wraps a gRPC+protobuf connection
type ClientConnection struct {
client pb.CollectorClient
conn *grpc.ClientConn
}

func ConnectClient(hostIP string, hostPort int) (*ClientConnection, error) {
flag.Parse()
// Set up a connection to the server.
socket := utils.GetSocket(hostIP, hostPort)
conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}

return &ClientConnection{
client: pb.NewCollectorClient(conn),
conn: conn,
}, nil
}

func (cp *ClientConnection) Client() pb.CollectorClient {
return cp.client
}

func (cp *ClientConnection) Close() error {
return cp.conn.Close()
}
209 changes: 209 additions & 0 deletions pkg/pipeline/write/grpc/genericmap/genericmap.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

105 changes: 105 additions & 0 deletions pkg/pipeline/write/grpc/genericmap/genericmap_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ddc1f67

Please sign in to comment.