Skip to content

Commit

Permalink
moved first commitable blocks check to data processor
Browse files Browse the repository at this point in the history
  • Loading branch information
ssd04 committed Jun 12, 2024
1 parent e7293a6 commit 8d7930c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 36 deletions.
2 changes: 1 addition & 1 deletion connector/connectorRunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func (cr *connectorRunner) Run() error {
DataAggregator: dataAggregator,
RetryDurationInMilliseconds: cr.config.Publisher.RetryDurationInMiliseconds,
Marshalizer: protoMarshaller,
FirstCommitableBlocks: firstCommitableBlocks,
}

publisherHandler, err := process.NewPublisherHandler(publisherHandlerArgs)
Expand All @@ -114,6 +113,7 @@ func (cr *connectorRunner) Run() error {
gogoProtoMarshaller,
blocksPool,
outportBlockConverter,
firstCommitableBlocks,
)
if err != nil {
return fmt.Errorf("cannot create ws firehose data processor, error: %w", err)
Expand Down
20 changes: 20 additions & 0 deletions process/dataProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type dataProcessor struct {
publisher Publisher
outportBlocksPool BlocksPool
outportBlockConverter OutportBlockConverter
firstCommitableBlocks map[uint32]uint64
}

// NewDataProcessor creates a data processor able to receive data from a ws outport driver and handle blocks
Expand All @@ -23,6 +24,7 @@ func NewDataProcessor(
marshaller marshal.Marshalizer,
blocksPool BlocksPool,
outportBlockConverter OutportBlockConverter,
firstCommitableBlocks map[uint32]uint64,
) (DataProcessor, error) {
if check.IfNil(publisher) {
return nil, ErrNilPublisher
Expand All @@ -36,12 +38,16 @@ func NewDataProcessor(
if check.IfNil(outportBlockConverter) {
return nil, ErrNilOutportBlocksConverter
}
if firstCommitableBlocks == nil {
return nil, ErrNilFirstCommitableBlocks
}

dp := &dataProcessor{
marshaller: marshaller,
publisher: publisher,
outportBlocksPool: blocksPool,
outportBlockConverter: outportBlockConverter,
firstCommitableBlocks: firstCommitableBlocks,
}

dp.operationHandlers = map[string]func(marshalledData []byte) error{
Expand Down Expand Up @@ -107,6 +113,20 @@ func (dp *dataProcessor) handleMetaOutportBlock(outportBlock *outport.OutportBlo
return fmt.Errorf("failed to put metablock: %w", err)
}

shardID := metaOutportBlock.GetShardID()
firstCommitableBlock, ok := dp.firstCommitableBlocks[shardID]
if !ok {
return fmt.Errorf("failed to get first commitable block for shard %d", shardID)
}

if metaNonce < firstCommitableBlock {
// do not try to aggregate or publish hyper outport block

log.Trace("do not publish block", "currentNonce", metaNonce, "firstCommitableNonce", firstCommitableBlock)

return nil
}

err = dp.publisher.PublishBlock(headerHash)
if err != nil {
return fmt.Errorf("failed to publish block: %w", err)
Expand Down
48 changes: 13 additions & 35 deletions process/publisherHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ const (
)

type publisherHandler struct {
handler HyperBlockPublisher
outportBlocksPool BlocksPool
dataAggregator DataAggregator
retryDuration time.Duration
marshaller marshal.Marshalizer
firstCommitableBlocks map[uint32]uint64
handler HyperBlockPublisher
outportBlocksPool BlocksPool
dataAggregator DataAggregator
retryDuration time.Duration
marshaller marshal.Marshalizer

blocksChan chan []byte
cancelFunc func()
Expand All @@ -42,7 +41,6 @@ type PublisherHandlerArgs struct {
DataAggregator DataAggregator
RetryDurationInMilliseconds uint64
Marshalizer marshal.Marshalizer
FirstCommitableBlocks map[uint32]uint64
}

// NewPublisherHandler creates a new publisher handler component
Expand All @@ -59,24 +57,20 @@ func NewPublisherHandler(args PublisherHandlerArgs) (*publisherHandler, error) {
if check.IfNil(args.Marshalizer) {
return nil, ErrNilMarshaller
}
if args.FirstCommitableBlocks == nil {
return nil, ErrNilFirstCommitableBlocks
}
if args.RetryDurationInMilliseconds < minRetryDurationInMilliseconds {
return nil, fmt.Errorf("%w for retry duration: provided %d, min required %d",
ErrInvalidValue, args.RetryDurationInMilliseconds, minRetryDurationInMilliseconds)
}

ph := &publisherHandler{
handler: args.Handler,
outportBlocksPool: args.OutportBlocksPool,
dataAggregator: args.DataAggregator,
marshaller: args.Marshalizer,
retryDuration: time.Duration(args.RetryDurationInMilliseconds) * time.Millisecond,
firstCommitableBlocks: args.FirstCommitableBlocks,
checkpoint: &data.PublishCheckpoint{},
blocksChan: make(chan []byte),
closeChan: make(chan struct{}),
handler: args.Handler,
outportBlocksPool: args.OutportBlocksPool,
dataAggregator: args.DataAggregator,
marshaller: args.Marshalizer,
retryDuration: time.Duration(args.RetryDurationInMilliseconds) * time.Millisecond,
checkpoint: &data.PublishCheckpoint{},
blocksChan: make(chan []byte),
closeChan: make(chan struct{}),
}

var ctx context.Context
Expand Down Expand Up @@ -224,22 +218,6 @@ func (ph *publisherHandler) handlerHyperOutportBlock(headerHash []byte) error {
return err
}

metaNonce := metaOutportBlock.BlockData.Header.GetNonce()
shardID := metaOutportBlock.GetShardID()

firstCommitableBlock, ok := ph.firstCommitableBlocks[shardID]
if !ok {
return fmt.Errorf("failed to get first commitable block for shard %d", shardID)
}

if metaNonce < firstCommitableBlock {
// do not try to aggregate or publish hyper outport block

log.Trace("do not commit block", "currentNonce", metaNonce, "firstCommitableNonce", firstCommitableBlock)

return nil
}

hyperOutportBlock, err := ph.dataAggregator.ProcessHyperBlock(metaOutportBlock)
if err != nil {
return err
Expand Down

0 comments on commit 8d7930c

Please sign in to comment.