From 10abf11d32ad369e69cf47eb0660b7276aa906f6 Mon Sep 17 00:00:00 2001 From: Julien Pinsonneau Date: Mon, 4 Mar 2024 16:26:55 +0100 Subject: [PATCH 1/2] add grpc write stage --- pkg/api/write_grpc.go | 21 ++ pkg/config/config.go | 1 + pkg/pipeline/pipeline_builder.go | 2 + pkg/pipeline/write/grpc/client.go | 40 ++++ .../write/grpc/genericmap/genericmap.pb.go | 209 ++++++++++++++++++ .../grpc/genericmap/genericmap_grpc.pb.go | 105 +++++++++ pkg/pipeline/write/grpc/server.go | 77 +++++++ pkg/pipeline/write/write_grpc.go | 55 +++++ proto/README.md | 8 + proto/genericmap.proto | 19 ++ 10 files changed, 537 insertions(+) create mode 100644 pkg/api/write_grpc.go create mode 100644 pkg/pipeline/write/grpc/client.go create mode 100644 pkg/pipeline/write/grpc/genericmap/genericmap.pb.go create mode 100644 pkg/pipeline/write/grpc/genericmap/genericmap_grpc.pb.go create mode 100644 pkg/pipeline/write/grpc/server.go create mode 100644 pkg/pipeline/write/write_grpc.go create mode 100644 proto/README.md create mode 100644 proto/genericmap.proto diff --git a/pkg/api/write_grpc.go b/pkg/api/write_grpc.go new file mode 100644 index 000000000..a628a8993 --- /dev/null +++ b/pkg/api/write_grpc.go @@ -0,0 +1,21 @@ +package api + +import "errors" + +type WriteGRPC struct { + TargetHost string `yaml:"targetHost,omitempty" json:"targetHost,omitempty" doc:"the host name or IP of the target Flow collector"` + TargetPort int `yaml:"targetPort,omitempty" json:"targetPort,omitempty" doc:"the port of the target Flow collector"` +} + +func (w *WriteGRPC) Validate() error { + if w == nil { + return errors.New("you must provide a configuration") + } + if w.TargetHost == "" { + return errors.New("targetHost can't be empty") + } + if w.TargetPort == 0 { + return errors.New("targetPort can't be empty") + } + return nil +} diff --git a/pkg/config/config.go b/pkg/config/config.go index ea10e2045..25c08c269 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -129,6 +129,7 @@ type Write struct { Loki *api.WriteLoki `yaml:"loki,omitempty" json:"loki,omitempty"` Stdout *api.WriteStdout `yaml:"stdout,omitempty" json:"stdout,omitempty"` Ipfix *api.WriteIpfix `yaml:"ipfix,omitempty" json:"ipfix,omitempty"` + GRPC *api.WriteGRPC `yaml:"grpc,omitempty" json:"grpc,omitempty"` } // ParseConfig creates the internal unmarshalled representation from the Pipeline and Parameters json diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index 769a2a396..6577b90d8 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -368,6 +368,8 @@ func getWriter(opMetrics *operational.Metrics, params config.StageParam) (write. var writer write.Writer var err error switch params.Write.Type { + case api.GRPCType: + writer, err = write.NewWriteGRPC(params) case api.StdoutType: writer, err = write.NewWriteStdout(params) case api.NoneType: diff --git a/pkg/pipeline/write/grpc/client.go b/pkg/pipeline/write/grpc/client.go new file mode 100644 index 000000000..082b46ce9 --- /dev/null +++ b/pkg/pipeline/write/grpc/client.go @@ -0,0 +1,40 @@ +package grpc + +import ( + "flag" + "log" + + pb "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap" + "github.com/netobserv/netobserv-ebpf-agent/pkg/utils" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// ClientConnection wraps a gRPC+protobuf connection +type ClientConnection struct { + client pb.CollectorClient + conn *grpc.ClientConn +} + +func ConnectClient(hostIP string, hostPort int) (*ClientConnection, error) { + flag.Parse() + // Set up a connection to the server. + socket := utils.GetSocket(hostIP, hostPort) + conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + + return &ClientConnection{ + client: pb.NewCollectorClient(conn), + conn: conn, + }, nil +} + +func (cp *ClientConnection) Client() pb.CollectorClient { + return cp.client +} + +func (cp *ClientConnection) Close() error { + return cp.conn.Close() +} diff --git a/pkg/pipeline/write/grpc/genericmap/genericmap.pb.go b/pkg/pipeline/write/grpc/genericmap/genericmap.pb.go new file mode 100644 index 000000000..a3ba1db8a --- /dev/null +++ b/pkg/pipeline/write/grpc/genericmap/genericmap.pb.go @@ -0,0 +1,209 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.19.6 +// source: proto/genericmap.proto + +package genericmap + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + anypb "google.golang.org/protobuf/types/known/anypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// The request message containing the GenericMap +type Flow struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + GenericMap *anypb.Any `protobuf:"bytes,1,opt,name=genericMap,proto3" json:"genericMap,omitempty"` +} + +func (x *Flow) Reset() { + *x = Flow{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_genericmap_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Flow) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Flow) ProtoMessage() {} + +func (x *Flow) ProtoReflect() protoreflect.Message { + mi := &file_proto_genericmap_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Flow.ProtoReflect.Descriptor instead. +func (*Flow) Descriptor() ([]byte, []int) { + return file_proto_genericmap_proto_rawDescGZIP(), []int{0} +} + +func (x *Flow) GetGenericMap() *anypb.Any { + if x != nil { + return x.GenericMap + } + return nil +} + +// intentionally empty +type CollectorReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *CollectorReply) Reset() { + *x = CollectorReply{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_genericmap_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CollectorReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CollectorReply) ProtoMessage() {} + +func (x *CollectorReply) ProtoReflect() protoreflect.Message { + mi := &file_proto_genericmap_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CollectorReply.ProtoReflect.Descriptor instead. +func (*CollectorReply) Descriptor() ([]byte, []int) { + return file_proto_genericmap_proto_rawDescGZIP(), []int{1} +} + +var File_proto_genericmap_proto protoreflect.FileDescriptor + +var file_proto_genericmap_proto_rawDesc = []byte{ + 0x0a, 0x16, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x6d, + 0x61, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, + 0x63, 0x6d, 0x61, 0x70, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, + 0x3c, 0x0a, 0x04, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x34, 0x0a, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, + 0x69, 0x63, 0x4d, 0x61, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, + 0x79, 0x52, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x4d, 0x61, 0x70, 0x22, 0x10, 0x0a, + 0x0e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x32, + 0x43, 0x0a, 0x09, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x36, 0x0a, 0x04, + 0x53, 0x65, 0x6e, 0x64, 0x12, 0x10, 0x2e, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x6d, 0x61, + 0x70, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x1a, 0x1a, 0x2e, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, + 0x6d, 0x61, 0x70, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x70, + 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0e, 0x5a, 0x0c, 0x2e, 0x2f, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, + 0x63, 0x6d, 0x61, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_genericmap_proto_rawDescOnce sync.Once + file_proto_genericmap_proto_rawDescData = file_proto_genericmap_proto_rawDesc +) + +func file_proto_genericmap_proto_rawDescGZIP() []byte { + file_proto_genericmap_proto_rawDescOnce.Do(func() { + file_proto_genericmap_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_genericmap_proto_rawDescData) + }) + return file_proto_genericmap_proto_rawDescData +} + +var file_proto_genericmap_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_proto_genericmap_proto_goTypes = []interface{}{ + (*Flow)(nil), // 0: genericmap.Flow + (*CollectorReply)(nil), // 1: genericmap.CollectorReply + (*anypb.Any)(nil), // 2: google.protobuf.Any +} +var file_proto_genericmap_proto_depIdxs = []int32{ + 2, // 0: genericmap.Flow.genericMap:type_name -> google.protobuf.Any + 0, // 1: genericmap.Collector.Send:input_type -> genericmap.Flow + 1, // 2: genericmap.Collector.Send:output_type -> genericmap.CollectorReply + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_proto_genericmap_proto_init() } +func file_proto_genericmap_proto_init() { + if File_proto_genericmap_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_genericmap_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Flow); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_genericmap_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CollectorReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_genericmap_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_genericmap_proto_goTypes, + DependencyIndexes: file_proto_genericmap_proto_depIdxs, + MessageInfos: file_proto_genericmap_proto_msgTypes, + }.Build() + File_proto_genericmap_proto = out.File + file_proto_genericmap_proto_rawDesc = nil + file_proto_genericmap_proto_goTypes = nil + file_proto_genericmap_proto_depIdxs = nil +} diff --git a/pkg/pipeline/write/grpc/genericmap/genericmap_grpc.pb.go b/pkg/pipeline/write/grpc/genericmap/genericmap_grpc.pb.go new file mode 100644 index 000000000..12a8d8aaf --- /dev/null +++ b/pkg/pipeline/write/grpc/genericmap/genericmap_grpc.pb.go @@ -0,0 +1,105 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.6 +// source: proto/genericmap.proto + +package genericmap + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// CollectorClient is the client API for Collector service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type CollectorClient interface { + Send(ctx context.Context, in *Flow, opts ...grpc.CallOption) (*CollectorReply, error) +} + +type collectorClient struct { + cc grpc.ClientConnInterface +} + +func NewCollectorClient(cc grpc.ClientConnInterface) CollectorClient { + return &collectorClient{cc} +} + +func (c *collectorClient) Send(ctx context.Context, in *Flow, opts ...grpc.CallOption) (*CollectorReply, error) { + out := new(CollectorReply) + err := c.cc.Invoke(ctx, "/genericmap.Collector/Send", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// CollectorServer is the server API for Collector service. +// All implementations must embed UnimplementedCollectorServer +// for forward compatibility +type CollectorServer interface { + Send(context.Context, *Flow) (*CollectorReply, error) + mustEmbedUnimplementedCollectorServer() +} + +// UnimplementedCollectorServer must be embedded to have forward compatible implementations. +type UnimplementedCollectorServer struct { +} + +func (UnimplementedCollectorServer) Send(context.Context, *Flow) (*CollectorReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method Send not implemented") +} +func (UnimplementedCollectorServer) mustEmbedUnimplementedCollectorServer() {} + +// UnsafeCollectorServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CollectorServer will +// result in compilation errors. +type UnsafeCollectorServer interface { + mustEmbedUnimplementedCollectorServer() +} + +func RegisterCollectorServer(s grpc.ServiceRegistrar, srv CollectorServer) { + s.RegisterService(&Collector_ServiceDesc, srv) +} + +func _Collector_Send_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Flow) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CollectorServer).Send(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/genericmap.Collector/Send", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CollectorServer).Send(ctx, req.(*Flow)) + } + return interceptor(ctx, in, info, handler) +} + +// Collector_ServiceDesc is the grpc.ServiceDesc for Collector service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Collector_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "genericmap.Collector", + HandlerType: (*CollectorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Send", + Handler: _Collector_Send_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "proto/genericmap.proto", +} diff --git a/pkg/pipeline/write/grpc/server.go b/pkg/pipeline/write/grpc/server.go new file mode 100644 index 000000000..0a085e924 --- /dev/null +++ b/pkg/pipeline/write/grpc/server.go @@ -0,0 +1,77 @@ +package grpc + +import ( + "context" + "fmt" + "net" + + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap" +) + +// CollectorServer wraps a Flow Collector connection & session +type CollectorServer struct { + grpcServer *grpc.Server +} + +type collectorOptions struct { + grpcServerOptions []grpc.ServerOption +} + +// CollectorOption allows overriding the default configuration of the CollectorServer instance. +// Use them in the StartCollector function. +type CollectorOption func(options *collectorOptions) + +func WithGRPCServerOptions(options ...grpc.ServerOption) CollectorOption { + return func(copt *collectorOptions) { + copt.grpcServerOptions = options + } +} + +// StartCollector listens in background for gRPC+Protobuf flows in the given port, and forwards each +// set of *genericmap.Flow by the provided channel. +func StartCollector( + port int, recordForwarder chan<- *genericmap.Flow, options ...CollectorOption, +) (*CollectorServer, error) { + copts := collectorOptions{} + for _, opt := range options { + opt(&copts) + } + + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return nil, err + } + grpcServer := grpc.NewServer(copts.grpcServerOptions...) + genericmap.RegisterCollectorServer(grpcServer, &collectorAPI{ + recordForwarder: recordForwarder, + }) + reflection.Register(grpcServer) + go func() { + if err := grpcServer.Serve(lis); err != nil { + panic("error connecting to server: " + err.Error()) + } + }() + return &CollectorServer{ + grpcServer: grpcServer, + }, nil +} + +func (c *CollectorServer) Close() error { + c.grpcServer.Stop() + return nil +} + +type collectorAPI struct { + genericmap.UnimplementedCollectorServer + recordForwarder chan<- *genericmap.Flow +} + +var okReply = &genericmap.CollectorReply{} + +func (c *collectorAPI) Send(_ context.Context, records *genericmap.Flow) (*genericmap.CollectorReply, error) { + c.recordForwarder <- records + return okReply, nil +} diff --git a/pkg/pipeline/write/write_grpc.go b/pkg/pipeline/write/write_grpc.go new file mode 100644 index 000000000..c89d5f361 --- /dev/null +++ b/pkg/pipeline/write/write_grpc.go @@ -0,0 +1,55 @@ +package write + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap" + "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/anypb" +) + +type writeGRPC struct { + hostIP string + hostPort int + clientConn *grpc.ClientConnection +} + +// Write writes a flow before being stored +func (t *writeGRPC) Write(v config.GenericMap) { + logrus.Tracef("entering writeGRPC Write %s", v) + value, _ := json.Marshal(v) + if _, err := t.clientConn.Client().Send(context.TODO(), &genericmap.Flow{ + GenericMap: &anypb.Any{ + Value: value, + }, + }); err != nil { + logrus.Errorf("writeGRPC send error: %v", err) + } +} + +// NewWriteGRPC create a new write +func NewWriteGRPC(params config.StageParam) (Writer, error) { + logrus.Debugf("entering NewWriteGRPC") + + writeGRPC := &writeGRPC{} + if params.Write != nil && params.Write.GRPC != nil { + if err := params.Write.GRPC.Validate(); err != nil { + return nil, fmt.Errorf("the provided config is not valid: %w", err) + } + writeGRPC.hostIP = params.Write.GRPC.TargetHost + writeGRPC.hostPort = params.Write.GRPC.TargetPort + } else { + return nil, fmt.Errorf("write.grpc param is mandatory: %v", params.Write) + } + logrus.Debugf("NewWriteGRPC ConnectClient %s:%d...", writeGRPC.hostIP, writeGRPC.hostPort) + clientConn, err := grpc.ConnectClient(writeGRPC.hostIP, writeGRPC.hostPort) + if err != nil { + return nil, err + } + writeGRPC.clientConn = clientConn + return writeGRPC, nil +} diff --git a/proto/README.md b/proto/README.md new file mode 100644 index 000000000..78fbc764a --- /dev/null +++ b/proto/README.md @@ -0,0 +1,8 @@ +# Update genericmap gRPC + +Run the following commands to update `genericmap.pb.go` and `genericmap_grpc.pb.go`: + +```bash +$ protoc --go_out=./pkg/pipeline/write/grpc ./proto/genericmap.proto +$ protoc --go-grpc_out=./pkg/pipeline/write/grpc ./proto/genericmap.proto +``` \ No newline at end of file diff --git a/proto/genericmap.proto b/proto/genericmap.proto new file mode 100644 index 000000000..54513d17a --- /dev/null +++ b/proto/genericmap.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +option go_package = "./genericmap"; + +package genericmap; + +import "google/protobuf/any.proto"; + +service Collector { + rpc Send (Flow) returns (CollectorReply) {} +} + +// The request message containing the GenericMap +message Flow { + google.protobuf.Any genericMap = 1; +} + +// intentionally empty +message CollectorReply {} \ No newline at end of file From bc514f2c39e350261c4397468b299db2e68f9a67 Mon Sep 17 00:00:00 2001 From: Julien Pinsonneau Date: Wed, 6 Mar 2024 12:28:03 +0100 Subject: [PATCH 2/2] add testing --- pkg/pipeline/write/grpc/grpc_test.go | 94 +++++++++++++++++++++++++++ pkg/pipeline/write/write_grpc_test.go | 42 ++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 pkg/pipeline/write/grpc/grpc_test.go create mode 100644 pkg/pipeline/write/write_grpc_test.go diff --git a/pkg/pipeline/write/grpc/grpc_test.go b/pkg/pipeline/write/grpc/grpc_test.go new file mode 100644 index 000000000..226c0b731 --- /dev/null +++ b/pkg/pipeline/write/grpc/grpc_test.go @@ -0,0 +1,94 @@ +package grpc + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/mariomac/guara/pkg/test" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/anypb" +) + +const timeout = 5 * time.Second + +func TestGRPCCommunication(t *testing.T) { + port, err := test.FreeTCPPort() + require.NoError(t, err) + serverOut := make(chan *genericmap.Flow) + _, err = StartCollector(port, serverOut) + require.NoError(t, err) + cc, err := ConnectClient("127.0.0.1", port) + require.NoError(t, err) + client := cc.Client() + + gm := config.GenericMap{ + "test": "test", + } + value, err := json.Marshal(gm) + require.NoError(t, err) + + go func() { + _, err = client.Send(context.Background(), + &genericmap.Flow{ + GenericMap: &anypb.Any{ + Value: value, + }, + }) + require.NoError(t, err) + }() + + var rs *genericmap.Flow + select { + case rs = <-serverOut: + case <-time.After(timeout): + require.Fail(t, "timeout waiting for flows") + } + assert.NotNil(t, rs.GenericMap) + assert.EqualValues(t, value, rs.GenericMap.Value) + select { + case rs = <-serverOut: + assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs) + default: + //ok! + } +} + +func TestConstructorOptions(t *testing.T) { + port, err := test.FreeTCPPort() + require.NoError(t, err) + intercepted := make(chan struct{}) + // Override the default GRPC collector to verify that StartCollector is applying the + // passed options + _, err = StartCollector(port, make(chan *genericmap.Flow), + WithGRPCServerOptions(grpc.UnaryInterceptor(func( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (resp interface{}, err error) { + close(intercepted) + return handler(ctx, req) + }))) + require.NoError(t, err) + cc, err := ConnectClient("127.0.0.1", port) + require.NoError(t, err) + client := cc.Client() + + go func() { + _, err = client.Send(context.Background(), + &genericmap.Flow{GenericMap: &anypb.Any{}}) + require.NoError(t, err) + }() + + select { + case <-intercepted: + case <-time.After(timeout): + require.Fail(t, "timeout waiting for unary interceptor to work") + } +} diff --git a/pkg/pipeline/write/write_grpc_test.go b/pkg/pipeline/write/write_grpc_test.go new file mode 100644 index 000000000..fddd07f77 --- /dev/null +++ b/pkg/pipeline/write/write_grpc_test.go @@ -0,0 +1,42 @@ +package write + +import ( + "testing" + + "github.com/mariomac/guara/pkg/test" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc" + "github.com/stretchr/testify/require" +) + +func Test_WriteGRPC(t *testing.T) { + port, err := test.FreeTCPPort() + require.NoError(t, err) + cc, err := grpc.ConnectClient("127.0.0.1", port) + require.NoError(t, err) + ws := writeGRPC{ + hostIP: "127.0.0.1", + hostPort: port, + clientConn: cc, + } + ws.Write(config.GenericMap{"key": "test"}) +} + +func Test_NewWriteGRPC(t *testing.T) { + writer, err := NewWriteGRPC(config.StageParam{}) + require.Nil(t, writer) + require.Error(t, err) + + writeParams := api.WriteGRPC{ + TargetHost: "target", + TargetPort: 1234, + } + writer, err = NewWriteGRPC(config.StageParam{ + Write: &config.Write{ + GRPC: &writeParams, + }, + }) + require.Nil(t, err) + require.NotNil(t, writer) +}