diff --git a/factory/publisherFactory.go b/factory/publisherFactory.go index c5d3fd0..3a35a4d 100644 --- a/factory/publisherFactory.go +++ b/factory/publisherFactory.go @@ -7,6 +7,7 @@ import ( "github.com/multiversx/mx-chain-ws-connector-firehose-go/config" "github.com/multiversx/mx-chain-ws-connector-firehose-go/process" "github.com/multiversx/mx-chain-ws-connector-firehose-go/server" + "google.golang.org/grpc" ) // CreatePublisher will return the required Publisher implementation based on whether the hyperOutportBlock are @@ -23,7 +24,9 @@ func CreatePublisher( return nil, fmt.Errorf("failed to create grpc blocks handler: %w", err) } - s, err := server.New(cfg.GRPC, handler) + grpcServer := grpc.NewServer() + + s, err := server.NewGRPCServerWrapper(grpcServer, cfg.GRPC, handler) if err != nil { return nil, fmt.Errorf("failed to create grpc server: %w", err) } diff --git a/server/grpcServer.go b/server/grpcServer.go index 103f7f6..823fd08 100644 --- a/server/grpcServer.go +++ b/server/grpcServer.go @@ -6,7 +6,6 @@ import ( "net" logger "github.com/multiversx/mx-chain-logger-go" - "google.golang.org/grpc" "google.golang.org/grpc/reflection" "github.com/multiversx/mx-chain-ws-connector-firehose-go/config" @@ -19,34 +18,42 @@ var ( log = logger.GetOrCreate("server") ) -type grpcServer struct { - server *grpc.Server +type grpcServer interface { + reflection.GRPCServer + Serve(lis net.Listener) error + GracefulStop() +} + +type grpcServerWrapper struct { + server grpcServer config config.GRPCConfig cancelFunc context.CancelFunc } -// New instantiates the underlying grpc server handling rpc requests. -func New(config config.GRPCConfig, blocksHandler process.GRPCBlocksHandler) (*grpcServer, error) { - s := grpc.NewServer() - +// NewGRPCServerWrapper instantiates the underlying grpc server handling rpc requests. +func NewGRPCServerWrapper( + grpcServer grpcServer, + config config.GRPCConfig, + blocksHandler process.GRPCBlocksHandler, +) (*grpcServerWrapper, error) { ctx, cancelFunc := context.WithCancel(context.Background()) service, err := hyperOutportBlock.NewService(ctx, blocksHandler) if err != nil { return nil, fmt.Errorf("failed to create service: %w", err) } - data.RegisterBlockStreamServer(s, service) - reflection.Register(s) + data.RegisterBlockStreamServer(grpcServer, service) + reflection.Register(grpcServer) - return &grpcServer{ - server: s, + return &grpcServerWrapper{ + server: grpcServer, config: config, cancelFunc: cancelFunc, }, nil } // Start will start the grpc server on the configured URL. -func (s *grpcServer) Start() { +func (s *grpcServerWrapper) Start() { go func() { err := s.run() if err != nil { @@ -55,7 +62,7 @@ func (s *grpcServer) Start() { }() } -func (s *grpcServer) run() error { +func (s *grpcServerWrapper) run() error { lis, err := net.Listen("tcp", s.config.URL) if err != nil { return fmt.Errorf("failed to listen: %v", err) @@ -69,14 +76,15 @@ func (s *grpcServer) run() error { } // Close will gracefully stop the grpc server. -func (s *grpcServer) Close() { +func (s *grpcServerWrapper) Close() { if s.cancelFunc != nil { s.cancelFunc() } + s.server.GracefulStop() } // IsInterfaceNil checks if the underlying server is nil. -func (s *grpcServer) IsInterfaceNil() bool { +func (s *grpcServerWrapper) IsInterfaceNil() bool { return s == nil } diff --git a/server/grpcServer_test.go b/server/grpcServer_test.go new file mode 100644 index 0000000..d6877f1 --- /dev/null +++ b/server/grpcServer_test.go @@ -0,0 +1,45 @@ +package server_test + +import ( + "net" + "testing" + + "github.com/multiversx/mx-chain-ws-connector-firehose-go/config" + "github.com/multiversx/mx-chain-ws-connector-firehose-go/server" + "github.com/multiversx/mx-chain-ws-connector-firehose-go/testscommon" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +type GRPCServerStub struct{} + +func (g *GRPCServerStub) RegisterService(desc *grpc.ServiceDesc, impl any) { +} + +func (g *GRPCServerStub) GetServiceInfo() map[string]grpc.ServiceInfo { + return make(map[string]grpc.ServiceInfo) +} + +func (g *GRPCServerStub) Serve(lis net.Listener) error { + return nil +} + +func (g *GRPCServerStub) GracefulStop() { +} + +func TestNewGRPCServerWrapper(t *testing.T) { + t.Parallel() + + gsv, err := server.NewGRPCServerWrapper( + &GRPCServerStub{}, + config.GRPCConfig{ + URL: "localhost:8081", + }, + &testscommon.GRPCBlocksHandlerStub{}, + ) + require.Nil(t, err) + require.False(t, gsv.IsInterfaceNil()) + + gsv.Start() + gsv.Close() +}