Skip to content

Commit

Permalink
fixes after review.
Browse files Browse the repository at this point in the history
  • Loading branch information
cristure committed Apr 22, 2024
1 parent 2dbaf6e commit cabb4a7
Show file tree
Hide file tree
Showing 17 changed files with 296 additions and 121 deletions.
3 changes: 1 addition & 2 deletions cmd/connector/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

[DataPool]
# Should be smaller then PruningWindow
MaxDelta = 10
MaxDelta = 100
PruningWindow = 1000

# Defines the number of active persisters to keep open
Expand All @@ -50,5 +50,4 @@
MaxOpenFiles = 10

[GRPC]
Enable = true
URL = "localhost:8000"
5 changes: 5 additions & 0 deletions cmd/connector/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ var (
Usage: "Option for specifying db mode. Available options: `full-persister`, `import-db`, `optimized-persister`",
Value: "full-persister",
}

enableGrpcServer = cli.BoolFlag{
Name: "enable-grpc-server",
Usage: "Option for enabling grpc server",
}
)
5 changes: 4 additions & 1 deletion cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ func startConnector(ctx *cli.Context) error {
dbMode := ctx.GlobalString(dbMode.Name)
log.Info("storer sync mode", "dbMode", dbMode)

connectorRunner, err := connector.NewConnectorRunner(cfg, dbMode)
enableGrpcServer := ctx.GlobalBool(enableGrpcServer.Name)
log.Info("grpc server enabled", "enableGrpcServer", enableGrpcServer)

connectorRunner, err := connector.NewConnectorRunner(cfg, dbMode, enableGrpcServer)
if err != nil {
return fmt.Errorf("cannot create connector runner, error: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,5 @@ type DBConfig struct {

// GRPCConfig will map the gRPC server configuration
type GRPCConfig struct {
Enable bool
URL string
URL string
}
60 changes: 14 additions & 46 deletions connector/connectorRunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/multiversx/mx-chain-core-go/marshal"
logger "github.com/multiversx/mx-chain-logger-go"
Expand All @@ -22,25 +21,28 @@ var log = logger.GetOrCreate("connectorRunner")
var ErrNilConfig = errors.New("nil configs provided")

type connectorRunner struct {
config *config.Config
dbMode string
config *config.Config
dbMode string
enableGrpcServer bool
}

// NewConnectorRunner will create a new connector runner instance
func NewConnectorRunner(cfg *config.Config, dbMode string) (*connectorRunner, error) {
func NewConnectorRunner(cfg *config.Config, dbMode string, enableGrpcServer bool) (*connectorRunner, error) {
if cfg == nil {
return nil, ErrNilConfig
}

return &connectorRunner{
config: cfg,
dbMode: dbMode,
config: cfg,
dbMode: dbMode,
enableGrpcServer: enableGrpcServer,
}, nil
}

// Run will trigger connector service
func (cr *connectorRunner) Run() error {
protoMarshaller := &marshal.GogoProtoMarshalizer{}
converter := process.NewOutportBlockConverter()

blockContainer, err := factory.CreateBlockContainer()
if err != nil {
Expand All @@ -57,30 +59,14 @@ func (cr *connectorRunner) Run() error {
return err
}

dataAggregator, err := process.NewDataAggregator(outportBlocksPool)
dataAggregator, err := process.NewDataAggregator(outportBlocksPool, converter)
if err != nil {
return err
}

var (
server *factory.GRPCServer
writer process.Writer
)
if cr.config.GRPC.Enable {
writer = &fakeWriter{}
handler, err := process.NewGRPCBlocksHandler(outportBlocksPool, dataAggregator)
if err != nil {
return fmt.Errorf("couldn't create grpc blocks handler, error: %w", err)
}
server = factory.NewServer(cr.config.GRPC, handler)

go func() {
if err := server.Start(); err != nil {
log.Error("couldn't start grpc server", "error", err)
}
}()
} else {
writer = os.Stdout
s, writer, err := factory.CreateGRPCServer(cr.enableGrpcServer, cr.config.GRPC, outportBlocksPool, dataAggregator)
if err != nil {
return err
}

publisher, err := process.NewFirehosePublisher(
Expand Down Expand Up @@ -121,27 +107,9 @@ func (cr *connectorRunner) Run() error {
log.Error(err.Error())
}

if server != nil {
server.Stop()
if s != nil {
s.Close()
}

return err
}

type fakeWriter struct {
err error
duration time.Duration
}

func (f *fakeWriter) Write(p []byte) (int, error) {
time.Sleep(f.duration)
if f.err != nil {
return 0, f.err
}

return len(p), nil
}

func (f *fakeWriter) Close() error {
return nil
}
29 changes: 0 additions & 29 deletions data/hyperOutportBlocks/type.go

This file was deleted.

64 changes: 35 additions & 29 deletions factory/grpcServerFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,54 @@ package factory

import (
"fmt"
"net"

"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"os"
"time"

"github.com/multiversx/mx-chain-ws-connector-template-go/config"
data "github.com/multiversx/mx-chain-ws-connector-template-go/data/hyperOutportBlocks"
"github.com/multiversx/mx-chain-ws-connector-template-go/process"
"github.com/multiversx/mx-chain-ws-connector-template-go/service/hyperOutportBlock"
"github.com/multiversx/mx-chain-ws-connector-template-go/server"
)

type GRPCServer struct {
server *grpc.Server
config config.GRPCConfig
}
// CreateGRPCServer will create the gRPC server along with the required handlers.
func CreateGRPCServer(
enableGrpcServer bool,
cfg config.GRPCConfig,
outportBlocksPool process.DataPool,
dataAggregator process.DataAggregator) (process.GRPCServer, process.Writer, error) {
if !enableGrpcServer {
return nil, os.Stdout, nil
}

// NewServer instantiates the underlying grpc server handling rpc requests.
func NewServer(config config.GRPCConfig, blocksHandler process.GRPCBlocksHandler) *GRPCServer {
s := grpc.NewServer()
handler, err := process.NewGRPCBlocksHandler(outportBlocksPool, dataAggregator)
if err != nil {
return nil, nil, fmt.Errorf("failed to create grpc blocks handler: %w", err)
}
s := server.New(cfg, handler)
err = s.Start()
if err != nil {
return nil, nil, fmt.Errorf("failed to start grpc server: %w", err)
}

service := hyperOutportBlock.NewService(blocksHandler)
data.RegisterHyperOutportBlockServiceServer(s, service)
reflection.Register(s)
return s, &fakeWriter{}, nil

return &GRPCServer{s, config}
}

// Start will start the grpc server on the configured URL.
func (s *GRPCServer) Start() error {
lis, err := net.Listen("tcp", s.config.URL)
if err != nil {
return fmt.Errorf("failed to listen: %v", err)
}
type fakeWriter struct {
err error
duration time.Duration
}

if err = s.server.Serve(lis); err != nil {
return fmt.Errorf("failed to serve: %v", err)
// Write is a mock writer.
func (f *fakeWriter) Write(p []byte) (int, error) {
time.Sleep(f.duration)
if f.err != nil {
return 0, f.err
}

return nil
return len(p), nil
}

// Stop will gracefully stop the grpc server.
func (s *GRPCServer) Stop() {
s.server.GracefulStop()
// Close is mock closer.
func (f *fakeWriter) Close() error {
return nil
}
6 changes: 5 additions & 1 deletion process/dataAggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ type dataAggregator struct {
// NewDataAggregator will create a new data aggregator instance
func NewDataAggregator(
blocksPool DataPool,
converter OutportBlockConverter,
) (*dataAggregator, error) {
if check.IfNil(blocksPool) {
return nil, ErrNilBlocksPool
}
if check.IfNil(converter) {
return nil, ErrNilOutportBlocksConverter
}

return &dataAggregator{
blocksPool: blocksPool,
converter: NewOutportBlockConverter(),
converter: converter,
}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions process/dataAggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ func TestNewDataAggregator(t *testing.T) {
t.Run("nil blocks pool", func(t *testing.T) {
t.Parallel()

da, err := process.NewDataAggregator(nil)
da, err := process.NewDataAggregator(nil, nil)
require.Nil(t, da)
require.Equal(t, process.ErrNilBlocksPool, err)
})

t.Run("should work", func(t *testing.T) {
t.Parallel()

da, err := process.NewDataAggregator(&testscommon.BlocksPoolStub{})
da, err := process.NewDataAggregator(&testscommon.BlocksPoolStub{}, process.NewOutportBlockConverter())
require.Nil(t, err)
require.False(t, da.IsInterfaceNil())
})
Expand All @@ -39,7 +39,7 @@ func TestDataAggregator_ProcessHyperBlock(t *testing.T) {

blocksPoolStub := &testscommon.BlocksPoolStub{}

da, err := process.NewDataAggregator(blocksPoolStub)
da, err := process.NewDataAggregator(blocksPoolStub, process.NewOutportBlockConverter())
require.Nil(t, err)

shardOutportBlock := createOutportBlock()
Expand All @@ -65,7 +65,7 @@ func TestDataAggregator_ProcessHyperBlock(t *testing.T) {
expectedResult, err := converter.HandleShardOutportBlock(shardOutportBlock)
require.NoError(t, err)

da, err := process.NewDataAggregator(blocksPoolStub)
da, err := process.NewDataAggregator(blocksPoolStub, process.NewOutportBlockConverter())
require.Nil(t, err)

outportBlock := createMetaOutportBlock()
Expand Down
3 changes: 3 additions & 0 deletions process/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ var ErrNilPublisher = errors.New("nil publisher provided")
// ErrNilBlocksPool signals that a nil blocks pool was provided
var ErrNilBlocksPool = errors.New("nil blocks pool provided")

// ErrNilOutportBlocksConverter signals that a nil blocks pool was provided
var ErrNilOutportBlocksConverter = errors.New("nil outport blocks converter provided")

// ErrNilDataAggregator signals that a nil data aggregator was provided
var ErrNilDataAggregator = errors.New("nil data aggregator provided")

Expand Down
1 change: 1 addition & 0 deletions process/fakeWriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package process
6 changes: 6 additions & 0 deletions process/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,16 @@ type PruningStorer interface {
type OutportBlockConverter interface {
HandleShardOutportBlock(outportBlock *outport.OutportBlock) (*data.ShardOutportBlock, error)
HandleMetaOutportBlock(outportBlock *outport.OutportBlock) (*data.MetaOutportBlock, error)
IsInterfaceNil() bool
}

// GRPCBlocksHandler defines the behaviour of handling block via gRPC
type GRPCBlocksHandler interface {
FetchHyperBlockByHash(hash []byte) (*data.HyperOutportBlock, error)
FetchHyperBlockByNonce(nonce uint64) (*data.HyperOutportBlock, error)
}

type GRPCServer interface {
Start() error
Close()
}
5 changes: 5 additions & 0 deletions process/outportBlockConverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,8 @@ func (o *outportBlockConverter) castBigInt(i *big.Int) ([]byte, error) {

return buf, err
}

// IsInterfaceNil returns nil if there is no value under the interface
func (o *outportBlockConverter) IsInterfaceNil() bool {
return o == nil
}
Loading

0 comments on commit cabb4a7

Please sign in to comment.