diff --git a/arbitrum/handler_p2p.go b/arbitrum/handler_p2p.go index 44e12b8dd4..31ef260a80 100644 --- a/arbitrum/handler_p2p.go +++ b/arbitrum/handler_p2p.go @@ -39,10 +39,10 @@ import ( ) type SyncHelper interface { - LastConfirmed() (*types.Header, uint64, error) + LastConfirmed() (*types.Header, uint64, uint64, error) LastCheckpoint() (*types.Header, error) CheckpointSupported(*types.Header) (bool, error) - ValidateConfirmed(*types.Header, uint64) (bool, error) + ValidateConfirmed(*types.Header, uint64, uint64) (bool, error) } type Peer struct { @@ -271,7 +271,7 @@ func (h *arbHandler) PeerInfo(id enode.ID) interface{} { return nil } -func (h *arbHandler) HandleLastConfirmed(peer *arb.Peer, confirmed *types.Header, node uint64) { +func (h *arbHandler) HandleLastConfirmed(peer *arb.Peer, confirmed *types.Header, l1BlockNumber uint64, node uint64) { protoHandler := (*protocolHandler)(h) validated := false valid := false @@ -284,7 +284,7 @@ func (h *arbHandler) HandleLastConfirmed(peer *arb.Peer, confirmed *types.Header } if !validated { var err error - valid, err = h.helper.ValidateConfirmed(confirmed, node) + valid, err = h.helper.ValidateConfirmed(confirmed, l1BlockNumber, node) if err != nil { log.Error("error in validate confirmed", "id", peer.ID(), "err", err) return @@ -343,7 +343,7 @@ func (h *arbHandler) HandleCheckpoint(peer *arb.Peer, checkpoint *types.Header, protoHandler.advanceCheckpoint(checkpoint) } -func (h *arbHandler) LastConfirmed() (*types.Header, uint64, error) { +func (h *arbHandler) LastConfirmed() (*types.Header, uint64, uint64, error) { return h.helper.LastConfirmed() } diff --git a/arbitrum/sync_test.go b/arbitrum/sync_test.go index 15838b3b76..e7d9487d0b 100644 --- a/arbitrum/sync_test.go +++ b/arbitrum/sync_test.go @@ -61,8 +61,8 @@ type dummySyncHelper struct { checkpoint *types.Header } -func (d *dummySyncHelper) LastConfirmed() (*types.Header, uint64, error) { - return d.confirmed, 0, nil +func (d *dummySyncHelper) LastConfirmed() (*types.Header, uint64, uint64, error) { + return d.confirmed, 0, 0, nil } func (d *dummySyncHelper) LastCheckpoint() (*types.Header, error) { @@ -76,7 +76,7 @@ func (d *dummySyncHelper) CheckpointSupported(*types.Header) (bool, error) { return true, nil } -func (d *dummySyncHelper) ValidateConfirmed(header *types.Header, node uint64) (bool, error) { +func (d *dummySyncHelper) ValidateConfirmed(header *types.Header, l1BlockNumber uint64, node uint64) (bool, error) { if d.confirmed == nil { return true, nil } diff --git a/core/blockchain.go b/core/blockchain.go index 35b5394676..efec7b8bb0 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -284,11 +284,26 @@ type BlockChain struct { numberOfBlocksToSkipStateSaving uint32 amountOfGasInBlocksToSkipStateSaving uint64 + forceTriedbCommitHook ForceTriedbCommitHook + processingSinceLastForceCommit time.Duration + gasSinceLastForceCommit uint64 } +type ForceTriedbCommitHook func(*types.Block, time.Duration, uint64) bool + type trieGcEntry struct { Root common.Hash Timestamp uint64 + GasUsed uint64 +} + +func NewArbBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, genesis *Genesis, overrides *ChainOverrides, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(header *types.Header) bool, txLookupLimit *uint64, forceTriedbCommitHook ForceTriedbCommitHook) (*BlockChain, error) { + bc, err := NewBlockChain(db, cacheConfig, chainConfig, genesis, overrides, engine, vmConfig, shouldPreserve, txLookupLimit) + if err != nil { + return nil, err + } + bc.forceTriedbCommitHook = forceTriedbCommitHook + return bc, nil } // NewBlockChain returns a fully initialised block chain using information @@ -1493,7 +1508,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // Full node or sparse archive node that's not keeping all states, do proper garbage collection bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive - bc.triegc.Push(trieGcEntry{root, block.Header().Time}, -int64(block.NumberU64())) + bc.triegc.Push(trieGcEntry{root, block.Header().Time, block.GasUsed()}, -int64(block.NumberU64())) blockLimit := int64(block.NumberU64()) - int64(bc.cacheConfig.TriesInMemory) // only cleared if below that timeLimit := time.Now().Unix() - int64(bc.cacheConfig.TrieRetention.Seconds()) // only cleared if less than that @@ -1509,6 +1524,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } var prevEntry *trieGcEntry var prevNum uint64 + var forceCommit bool // Garbage collect anything below our required write retention for !bc.triegc.Empty() { triegcEntry, number := bc.triegc.Pop() @@ -1517,15 +1533,24 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. break } if prevEntry != nil { + bc.gasSinceLastForceCommit += block.GasUsed() + if bc.forceTriedbCommitHook != nil && bc.forceTriedbCommitHook(block, bc.gcproc+bc.processingSinceLastForceCommit, bc.gasSinceLastForceCommit) { + forceCommit = true + break + } bc.triedb.Dereference(prevEntry.Root) } prevEntry = &triegcEntry prevNum = uint64(-number) } + if prevEntry != nil && bc.forceTriedbCommitHook != nil && !forceCommit { + bc.gasSinceLastForceCommit += block.GasUsed() + forceCommit = bc.forceTriedbCommitHook(block, bc.gcproc+bc.processingSinceLastForceCommit, bc.gasSinceLastForceCommit) + } flushInterval := time.Duration(bc.flushInterval.Load()) // If we exceeded out time allowance, flush an entire trie to disk // In case of archive node that skips some trie commits we don't flush tries here - if bc.gcproc > flushInterval && prevEntry != nil && !archiveNode { + if prevEntry != nil && ((bc.gcproc > flushInterval && !archiveNode) || forceCommit) { // If the header is missing (canonical chain behind), we're reorging a low // diff sidechain. Suspend committing until this operation is completed. header := bc.GetHeaderByNumber(prevNum) @@ -1540,6 +1565,12 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // Flush an entire trie and restart the counters bc.triedb.Commit(header.Root, true) bc.lastWrite = prevNum + if !forceCommit { + bc.processingSinceLastForceCommit += bc.gcproc + } else { + bc.processingSinceLastForceCommit = 0 + bc.gasSinceLastForceCommit = 0 + } bc.gcproc = 0 } } diff --git a/eth/protocols/arb/handler.go b/eth/protocols/arb/handler.go index 8a2f99b8e4..b48822b3c0 100644 --- a/eth/protocols/arb/handler.go +++ b/eth/protocols/arb/handler.go @@ -81,13 +81,14 @@ func HandleMessage(backend Backend, peer *Peer) error { } switch { case msg.Code == GetLastConfirmedMsg: - confirmed, node, err := backend.LastConfirmed() + confirmed, l1BlockNumber, node, err := backend.LastConfirmed() if err != nil || confirmed == nil { return err } response := LastConfirmedMsgPacket{ - Header: confirmed, - Node: node, + Header: confirmed, + L1BlockNumber: l1BlockNumber, + Node: node, } return p2p.Send(peer.rw, LastConfirmedMsg, &response) case msg.Code == LastConfirmedMsg: @@ -99,7 +100,7 @@ func HandleMessage(backend Backend, peer *Peer) error { if incoming.Header == nil { return nil } - backend.HandleLastConfirmed(peer, incoming.Header, incoming.Node) + backend.HandleLastConfirmed(peer, incoming.Header, incoming.L1BlockNumber, incoming.Node) return nil case msg.Code == GetLastCheckpointMsg: checkpoint, err := backend.LastCheckpoint() diff --git a/eth/protocols/arb/protocol.go b/eth/protocols/arb/protocol.go index 62a377f104..57eed84670 100644 --- a/eth/protocols/arb/protocol.go +++ b/eth/protocols/arb/protocol.go @@ -34,8 +34,9 @@ const ( ) type LastConfirmedMsgPacket struct { - Header *types.Header - Node uint64 + Header *types.Header + L1BlockNumber uint64 + Node uint64 } type CheckpointMsgPacket struct { @@ -62,9 +63,9 @@ type Handler func(peer *Peer) error // callback methods to invoke on remote deliveries. type Backend interface { PeerInfo(id enode.ID) interface{} - HandleLastConfirmed(peer *Peer, confirmed *types.Header, node uint64) + HandleLastConfirmed(peer *Peer, confirmed *types.Header, l1BlockNumber uint64, node uint64) HandleCheckpoint(peer *Peer, header *types.Header, supported bool) - LastConfirmed() (*types.Header, uint64, error) + LastConfirmed() (*types.Header, uint64, uint64, error) LastCheckpoint() (*types.Header, error) CheckpointSupported(*types.Header) (bool, error) // RunPeer is invoked when a peer joins on the `eth` protocol. The handler