Skip to content

Commit

Permalink
fix(concurrency): applying blocks concurrently can lead to unexpected…
Browse files Browse the repository at this point in the history
… errors (#700)

Co-authored-by: danwt <[email protected]>
  • Loading branch information
mtsitrin and danwt authored Apr 21, 2024
1 parent d71f4e2 commit 7290af6
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 61 deletions.
21 changes: 8 additions & 13 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,34 +118,29 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return nil
}

// TODO: move to gossip.go
func (m *Manager) attemptApplyCachedBlocks() error {
m.applyCachedBlockMutex.Lock()
defer m.applyCachedBlockMutex.Unlock()
m.retrieverMutex.Lock()
defer m.retrieverMutex.Unlock()

for {
expectedHeight := m.store.NextHeight()

prevCachedBlock, blockExists := m.prevBlock[expectedHeight]
prevCachedCommit, commitExists := m.prevCommit[expectedHeight]

if !blockExists || !commitExists {
cachedBlock, blockExists := m.blockCache[expectedHeight]
if !blockExists {
break
}

// Note: cached <block,commit> pairs have passed basic validation, so no need to validate again
err := m.applyBlock(prevCachedBlock, prevCachedCommit, blockMetaData{source: gossipedBlock})
err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, blockMetaData{source: gossipedBlock})
if err != nil {
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
m.logger.Debug("applied cached block", "height", expectedHeight)
}

for k := range m.prevBlock {
if k <= m.store.Height() {
delete(m.prevBlock, k)
delete(m.prevCommit, k)
}
delete(m.blockCache, cachedBlock.Block.Header.Height)
}

return nil
}

Expand Down
54 changes: 35 additions & 19 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,26 @@ type Manager struct {
// Block production
shouldProduceBlocksCh chan bool
produceEmptyBlockCh chan bool
produceBlockMutex sync.Mutex
applyCachedBlockMutex sync.Mutex
lastSubmissionTime atomic.Int64

/*
Guard against triggering a new batch submission when the old one is still going on (taking a while)
*/
submitBatchMutex sync.Mutex

/*
Protect against producing two blocks at once if the first one is taking a while
Also, used to protect against the block production that occurs when batch submission thread
creates its empty block.
*/
produceBlockMutex sync.Mutex

/*
Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
and incoming DA blocks, respectively.
*/
retrieverMutex sync.Mutex

// batch submission
batchInProcess sync.Mutex
lastSubmissionTime atomic.Int64
// pendingBatch is the result of the last DA submission
// that is pending settlement layer submission.
// It is used to avoid double submission of the same batch.
Expand All @@ -75,8 +89,7 @@ type Manager struct {
logger types.Logger

// Cached blocks and commits for applying at future heights. Invariant: the block and commit are .Valid() (validated sigs etc)
prevBlock map[uint64]*types.Block
prevCommit map[uint64]*types.Commit
blockCache map[uint64]CachedBlock
}

// NewManager creates new block Manager.
Expand Down Expand Up @@ -125,8 +138,7 @@ func NewManager(
shouldProduceBlocksCh: make(chan bool, 1),
produceEmptyBlockCh: make(chan bool, 1),
logger: logger,
prevBlock: make(map[uint64]*types.Block),
prevCommit: make(map[uint64]*types.Commit),
blockCache: make(map[uint64]CachedBlock),
}

return agg, nil
Expand Down Expand Up @@ -223,9 +235,10 @@ func (m *Manager) onNodeHealthStatus(event pubsub.Message) {
m.shouldProduceBlocksCh <- eventData.Error == nil
}

// TODO: move to gossip.go
// onNewGossippedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.prevBlock))
m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.blockCache))
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
Expand All @@ -236,17 +249,20 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
return
}

// if height is expected, apply
// if height is higher than expected (future block), cache
if block.Header.Height == m.store.NextHeight() {
err := m.applyBlock(&block, &commit, blockMetaData{source: gossipedBlock})
nextHeight := m.store.NextHeight()
if block.Header.Height >= nextHeight {
m.blockCache[block.Header.Height] = CachedBlock{
Block: &block,
Commit: &commit,
}
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.store.Height())
}

if block.Header.Height == nextHeight {
err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("apply gossiped block", "err", err)
m.logger.Error("applying cached blocks", "err", err)
}
} else if block.Header.Height > m.store.NextHeight() {
m.prevBlock[block.Header.Height] = &block
m.prevCommit[block.Header.Height] = &commit
m.logger.Debug("Caching block", "block height", block.Header.Height, "store height", m.store.Height())
}
}

Expand Down
17 changes: 12 additions & 5 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ func (m *Manager) syncUntilTarget(syncTarget uint64) error {
}
}
m.logger.Info("Synced", "current height", currentHeight, "syncTarget", syncTarget)

// check for cached blocks
err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying previous cached blocks", "err", err)
}

return nil
}

Expand All @@ -84,6 +91,9 @@ func (m *Manager) processNextDABatch(daMetaData *da.DASubmitMetaData) error {

m.logger.Debug("retrieved batches", "n", len(batchResp.Batches), "daHeight", daMetaData.Height)

m.retrieverMutex.Lock()
defer m.retrieverMutex.Unlock()

for _, batch := range batchResp.Batches {
for i, block := range batch.Blocks {
if block.Header.Height != m.store.NextHeight() {
Expand All @@ -97,12 +107,9 @@ func (m *Manager) processNextDABatch(daMetaData *da.DASubmitMetaData) error {
if err != nil {
return fmt.Errorf("apply block: height: %d: %w", block.Header.Height, err)
}
}
}

err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying previous cached blocks", "err", err)
delete(m.blockCache, block.Header.Height)
}
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func (m *Manager) SubmitLoop(ctx context.Context) {
// Finally, it submits the next batch of blocks and updates the sync target to the height of
// the last block in the submitted batch.
func (m *Manager) handleSubmissionTrigger(ctx context.Context) {
if !m.batchInProcess.TryLock() { // Attempt to lock for batch processing
if !m.submitBatchMutex.TryLock() { // Attempt to lock for batch processing
m.logger.Debug("Batch submission already in process, skipping submission")
return
}
defer m.batchInProcess.Unlock() // Ensure unlocking at the end
defer m.submitBatchMutex.Unlock() // Ensure unlocking at the end

// Load current sync target and height to determine if new blocks are available for submission.
syncTarget, height := m.syncTarget.Load(), m.store.Height()
Expand Down
5 changes: 5 additions & 0 deletions block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ type PendingBatch struct {
daResult *da.ResultSubmitBatch
batch *types.Batch
}

type CachedBlock struct {
Block *types.Block
Commit *types.Commit
}
1 change: 1 addition & 0 deletions rpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ func (c *Client) CheckTx(ctx context.Context, tx tmtypes.Tx) (*ctypes.ResultChec
}

func (c *Client) eventsRoutine(sub tmtypes.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) {
defer close(outc)
for {
select {
case msg := <-sub.Out():
Expand Down
41 changes: 19 additions & 22 deletions rpc/json/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,25 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo
return nil, fmt.Errorf("subscribe: %w", err)
}
go func(subscriptionID []byte) {
for {
select {
case msg := <-out:
// build the base response
var resp rpctypes.RPCResponse
// Check if subscriptionID is string or int and generate the rest of the response accordingly
subscriptionIDInt, err := strconv.Atoi(string(subscriptionID))
if err != nil {
s.logger.Info("Failed to convert subscriptionID to int")
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), msg)
} else {
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), msg)
}
// Marshal response to JSON and send it to the websocket queue
jsonBytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
s.logger.Error("marshal RPCResponse to JSON", "err", err)
continue
}
if wsConn != nil {
wsConn.queue <- jsonBytes
}
for msg := range out {
// build the base response
var resp rpctypes.RPCResponse
// Check if subscriptionID is string or int and generate the rest of the response accordingly
subscriptionIDInt, err := strconv.Atoi(string(subscriptionID))
if err != nil {
s.logger.Info("Failed to convert subscriptionID to int")
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), msg)
} else {
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), msg)
}
// Marshal response to JSON and send it to the websocket queue
jsonBytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
s.logger.Error("marshal RPCResponse to JSON", "err", err)
continue
}
if wsConn != nil {
wsConn.queue <- jsonBytes
}
}
}(subscriptionID)
Expand Down

0 comments on commit 7290af6

Please sign in to comment.