diff --git a/connector/connectorRunner.go b/connector/connectorRunner.go index 8e89886..ca5695d 100644 --- a/connector/connectorRunner.go +++ b/connector/connectorRunner.go @@ -101,7 +101,6 @@ func (cr *connectorRunner) Run() error { DataAggregator: dataAggregator, RetryDurationInMilliseconds: cr.config.Publisher.RetryDurationInMiliseconds, Marshalizer: protoMarshaller, - FirstCommitableBlocks: firstCommitableBlocks, } publisherHandler, err := process.NewPublisherHandler(publisherHandlerArgs) @@ -114,6 +113,7 @@ func (cr *connectorRunner) Run() error { gogoProtoMarshaller, blocksPool, outportBlockConverter, + firstCommitableBlocks, ) if err != nil { return fmt.Errorf("cannot create ws firehose data processor, error: %w", err) diff --git a/process/dataProcessor.go b/process/dataProcessor.go index b0e3aca..e06bdb1 100644 --- a/process/dataProcessor.go +++ b/process/dataProcessor.go @@ -15,6 +15,7 @@ type dataProcessor struct { publisher Publisher outportBlocksPool BlocksPool outportBlockConverter OutportBlockConverter + firstCommitableBlocks map[uint32]uint64 } // NewDataProcessor creates a data processor able to receive data from a ws outport driver and handle blocks @@ -23,6 +24,7 @@ func NewDataProcessor( marshaller marshal.Marshalizer, blocksPool BlocksPool, outportBlockConverter OutportBlockConverter, + firstCommitableBlocks map[uint32]uint64, ) (DataProcessor, error) { if check.IfNil(publisher) { return nil, ErrNilPublisher @@ -36,12 +38,16 @@ func NewDataProcessor( if check.IfNil(outportBlockConverter) { return nil, ErrNilOutportBlocksConverter } + if firstCommitableBlocks == nil { + return nil, ErrNilFirstCommitableBlocks + } dp := &dataProcessor{ marshaller: marshaller, publisher: publisher, outportBlocksPool: blocksPool, outportBlockConverter: outportBlockConverter, + firstCommitableBlocks: firstCommitableBlocks, } dp.operationHandlers = map[string]func(marshalledData []byte) error{ @@ -107,6 +113,20 @@ func (dp *dataProcessor) handleMetaOutportBlock(outportBlock *outport.OutportBlo return fmt.Errorf("failed to put metablock: %w", err) } + shardID := metaOutportBlock.GetShardID() + firstCommitableBlock, ok := dp.firstCommitableBlocks[shardID] + if !ok { + return fmt.Errorf("failed to get first commitable block for shard %d", shardID) + } + + if metaNonce < firstCommitableBlock { + // do not try to aggregate or publish hyper outport block + + log.Trace("do not publish block", "currentNonce", metaNonce, "firstCommitableNonce", firstCommitableBlock) + + return nil + } + err = dp.publisher.PublishBlock(headerHash) if err != nil { return fmt.Errorf("failed to publish block: %w", err) diff --git a/process/publisherHandler.go b/process/publisherHandler.go index fef68ed..7227608 100644 --- a/process/publisherHandler.go +++ b/process/publisherHandler.go @@ -20,12 +20,11 @@ const ( ) type publisherHandler struct { - handler HyperBlockPublisher - outportBlocksPool BlocksPool - dataAggregator DataAggregator - retryDuration time.Duration - marshaller marshal.Marshalizer - firstCommitableBlocks map[uint32]uint64 + handler HyperBlockPublisher + outportBlocksPool BlocksPool + dataAggregator DataAggregator + retryDuration time.Duration + marshaller marshal.Marshalizer blocksChan chan []byte cancelFunc func() @@ -42,7 +41,6 @@ type PublisherHandlerArgs struct { DataAggregator DataAggregator RetryDurationInMilliseconds uint64 Marshalizer marshal.Marshalizer - FirstCommitableBlocks map[uint32]uint64 } // NewPublisherHandler creates a new publisher handler component @@ -59,24 +57,20 @@ func NewPublisherHandler(args PublisherHandlerArgs) (*publisherHandler, error) { if check.IfNil(args.Marshalizer) { return nil, ErrNilMarshaller } - if args.FirstCommitableBlocks == nil { - return nil, ErrNilFirstCommitableBlocks - } if args.RetryDurationInMilliseconds < minRetryDurationInMilliseconds { return nil, fmt.Errorf("%w for retry duration: provided %d, min required %d", ErrInvalidValue, args.RetryDurationInMilliseconds, minRetryDurationInMilliseconds) } ph := &publisherHandler{ - handler: args.Handler, - outportBlocksPool: args.OutportBlocksPool, - dataAggregator: args.DataAggregator, - marshaller: args.Marshalizer, - retryDuration: time.Duration(args.RetryDurationInMilliseconds) * time.Millisecond, - firstCommitableBlocks: args.FirstCommitableBlocks, - checkpoint: &data.PublishCheckpoint{}, - blocksChan: make(chan []byte), - closeChan: make(chan struct{}), + handler: args.Handler, + outportBlocksPool: args.OutportBlocksPool, + dataAggregator: args.DataAggregator, + marshaller: args.Marshalizer, + retryDuration: time.Duration(args.RetryDurationInMilliseconds) * time.Millisecond, + checkpoint: &data.PublishCheckpoint{}, + blocksChan: make(chan []byte), + closeChan: make(chan struct{}), } var ctx context.Context @@ -224,22 +218,6 @@ func (ph *publisherHandler) handlerHyperOutportBlock(headerHash []byte) error { return err } - metaNonce := metaOutportBlock.BlockData.Header.GetNonce() - shardID := metaOutportBlock.GetShardID() - - firstCommitableBlock, ok := ph.firstCommitableBlocks[shardID] - if !ok { - return fmt.Errorf("failed to get first commitable block for shard %d", shardID) - } - - if metaNonce < firstCommitableBlock { - // do not try to aggregate or publish hyper outport block - - log.Trace("do not commit block", "currentNonce", metaNonce, "firstCommitableNonce", firstCommitableBlock) - - return nil - } - hyperOutportBlock, err := ph.dataAggregator.ProcessHyperBlock(metaOutportBlock) if err != nil { return err