Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrency): applying blocks concurrently can lead to unexpected errors #700

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,34 +118,29 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return nil
}

// TODO: move to gossip.go
func (m *Manager) attemptApplyCachedBlocks() error {
m.applyCachedBlockMutex.Lock()
defer m.applyCachedBlockMutex.Unlock()
m.retrieverMutex.Lock()
defer m.retrieverMutex.Unlock()

for {
expectedHeight := m.store.NextHeight()

prevCachedBlock, blockExists := m.prevBlock[expectedHeight]
prevCachedCommit, commitExists := m.prevCommit[expectedHeight]

if !blockExists || !commitExists {
cachedBlock, blockExists := m.blockCache[expectedHeight]
if !blockExists {
break
}

// Note: cached <block,commit> pairs have passed basic validation, so no need to validate again
err := m.applyBlock(prevCachedBlock, prevCachedCommit, blockMetaData{source: gossipedBlock})
err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, blockMetaData{source: gossipedBlock})
if err != nil {
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
m.logger.Debug("applied cached block", "height", expectedHeight)
}

for k := range m.prevBlock {
if k <= m.store.Height() {
delete(m.prevBlock, k)
delete(m.prevCommit, k)
}
delete(m.blockCache, cachedBlock.Block.Header.Height)
}

return nil
Copy link
Contributor Author

@mtsitrin mtsitrin Apr 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the pruning from each gossiped block, as it's not efficient, it goes over all the cached blocks.
it will be called when syncing the node

}

Expand Down
54 changes: 35 additions & 19 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,26 @@ type Manager struct {
// Block production
shouldProduceBlocksCh chan bool
produceEmptyBlockCh chan bool
produceBlockMutex sync.Mutex
applyCachedBlockMutex sync.Mutex
lastSubmissionTime atomic.Int64

/*
Guard against triggering a new batch submission when the old one is still going on (taking a while)
*/
submitBatchMutex sync.Mutex

/*
Protect against producing two blocks at once if the first one is taking a while
Also, used to protect against the block production that occurs when batch submission thread
creates its empty block.
*/
produceBlockMutex sync.Mutex

/*
Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
and incoming DA blocks, respectively.
*/
retrieverMutex sync.Mutex

// batch submission
batchInProcess sync.Mutex
lastSubmissionTime atomic.Int64
// pendingBatch is the result of the last DA submission
// that is pending settlement layer submission.
// It is used to avoid double submission of the same batch.
Expand All @@ -75,8 +89,7 @@ type Manager struct {
logger types.Logger

// Cached blocks and commits for applying at future heights. Invariant: the block and commit are .Valid() (validated sigs etc)
prevBlock map[uint64]*types.Block
prevCommit map[uint64]*types.Commit
blockCache map[uint64]CachedBlock
}

// NewManager creates new block Manager.
Expand Down Expand Up @@ -125,8 +138,7 @@ func NewManager(
shouldProduceBlocksCh: make(chan bool, 1),
produceEmptyBlockCh: make(chan bool, 1),
logger: logger,
prevBlock: make(map[uint64]*types.Block),
prevCommit: make(map[uint64]*types.Commit),
blockCache: make(map[uint64]CachedBlock),
}

return agg, nil
Expand Down Expand Up @@ -223,9 +235,10 @@ func (m *Manager) onNodeHealthStatus(event pubsub.Message) {
m.shouldProduceBlocksCh <- eventData.Error == nil
}

// TODO: move to gossip.go
// onNewGossippedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.prevBlock))
m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.blockCache))
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
Expand All @@ -236,17 +249,20 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
return
}

// if height is expected, apply
// if height is higher than expected (future block), cache
if block.Header.Height == m.store.NextHeight() {
err := m.applyBlock(&block, &commit, blockMetaData{source: gossipedBlock})
nextHeight := m.store.NextHeight()
if block.Header.Height >= nextHeight {
m.blockCache[block.Header.Height] = CachedBlock{
Block: &block,
Commit: &commit,
}
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.store.Height())
}

if block.Header.Height == nextHeight {
err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("apply gossiped block", "err", err)
m.logger.Error("applying cached blocks", "err", err)
}
} else if block.Header.Height > m.store.NextHeight() {
m.prevBlock[block.Header.Height] = &block
m.prevCommit[block.Header.Height] = &commit
m.logger.Debug("Caching block", "block height", block.Header.Height, "store height", m.store.Height())
}
}

Expand Down
17 changes: 12 additions & 5 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ func (m *Manager) syncUntilTarget(syncTarget uint64) error {
}
}
m.logger.Info("Synced", "current height", currentHeight, "syncTarget", syncTarget)

// check for cached blocks
err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying previous cached blocks", "err", err)
}

return nil
}

Expand All @@ -84,6 +91,9 @@ func (m *Manager) processNextDABatch(daMetaData *da.DASubmitMetaData) error {

m.logger.Debug("retrieved batches", "n", len(batchResp.Batches), "daHeight", daMetaData.Height)

m.retrieverMutex.Lock()
defer m.retrieverMutex.Unlock()

for _, batch := range batchResp.Batches {
for i, block := range batch.Blocks {
if block.Header.Height != m.store.NextHeight() {
Expand All @@ -97,12 +107,9 @@ func (m *Manager) processNextDABatch(daMetaData *da.DASubmitMetaData) error {
if err != nil {
return fmt.Errorf("apply block: height: %d: %w", block.Header.Height, err)
}
}
}

err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying previous cached blocks", "err", err)
delete(m.blockCache, block.Header.Height)
}
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func (m *Manager) SubmitLoop(ctx context.Context) {
// Finally, it submits the next batch of blocks and updates the sync target to the height of
// the last block in the submitted batch.
func (m *Manager) handleSubmissionTrigger(ctx context.Context) {
if !m.batchInProcess.TryLock() { // Attempt to lock for batch processing
if !m.submitBatchMutex.TryLock() { // Attempt to lock for batch processing
m.logger.Debug("Batch submission already in process, skipping submission")
return
}
defer m.batchInProcess.Unlock() // Ensure unlocking at the end
defer m.submitBatchMutex.Unlock() // Ensure unlocking at the end

// Load current sync target and height to determine if new blocks are available for submission.
syncTarget, height := m.syncTarget.Load(), m.store.Height()
Expand Down
5 changes: 5 additions & 0 deletions block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ type PendingBatch struct {
daResult *da.ResultSubmitBatch
batch *types.Batch
}

type CachedBlock struct {
Block *types.Block
Commit *types.Commit
}
1 change: 1 addition & 0 deletions rpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ func (c *Client) CheckTx(ctx context.Context, tx tmtypes.Tx) (*ctypes.ResultChec
}

func (c *Client) eventsRoutine(sub tmtypes.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) {
defer close(outc)
for {
select {
case msg := <-sub.Out():
Expand Down
41 changes: 19 additions & 22 deletions rpc/json/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,25 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo
return nil, fmt.Errorf("subscribe: %w", err)
}
go func(subscriptionID []byte) {
for {
select {
case msg := <-out:
// build the base response
var resp rpctypes.RPCResponse
// Check if subscriptionID is string or int and generate the rest of the response accordingly
subscriptionIDInt, err := strconv.Atoi(string(subscriptionID))
if err != nil {
s.logger.Info("Failed to convert subscriptionID to int")
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), msg)
} else {
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), msg)
}
// Marshal response to JSON and send it to the websocket queue
jsonBytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
s.logger.Error("marshal RPCResponse to JSON", "err", err)
continue
}
if wsConn != nil {
wsConn.queue <- jsonBytes
}
for msg := range out {
// build the base response
var resp rpctypes.RPCResponse
// Check if subscriptionID is string or int and generate the rest of the response accordingly
subscriptionIDInt, err := strconv.Atoi(string(subscriptionID))
if err != nil {
s.logger.Info("Failed to convert subscriptionID to int")
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), msg)
} else {
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), msg)
}
// Marshal response to JSON and send it to the websocket queue
jsonBytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
s.logger.Error("marshal RPCResponse to JSON", "err", err)
continue
}
if wsConn != nil {
wsConn.queue <- jsonBytes
}
}
}(subscriptionID)
Expand Down
Loading