Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement ReorgDetector V2 #34

Merged
merged 45 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
14f9ab1
Added e2e test for reorg detection
begmaroman Aug 7, 2024
af97f03
Implemented reorg monitor
begmaroman Aug 13, 2024
947b95c
Removed useless test
begmaroman Aug 13, 2024
ff2cf6d
Revert changes
begmaroman Aug 13, 2024
aa2fd26
Implementation
begmaroman Aug 20, 2024
ac4cbe4
Implementation
begmaroman Aug 20, 2024
9517b4b
Implementation
begmaroman Aug 20, 2024
52557ef
Updated tests
begmaroman Aug 20, 2024
0c9e883
Merge remote-tracking branch 'origin/develop' into feature/CDK-385
begmaroman Aug 20, 2024
ac5f738
Updated tests
begmaroman Aug 20, 2024
49fa7d8
Updated tests
begmaroman Aug 20, 2024
0dc2370
Implementation
begmaroman Aug 21, 2024
47c3b94
Implementation
begmaroman Aug 21, 2024
9913d87
Implementation
begmaroman Aug 21, 2024
6028440
Implementation
begmaroman Aug 21, 2024
4ebf12b
Merge remote-tracking branch 'origin/develop' into feature/CDK-385
begmaroman Aug 21, 2024
92e5681
Synced with main
begmaroman Aug 21, 2024
6af5644
Fixed tests
begmaroman Aug 21, 2024
e633b47
Implementation
begmaroman Aug 21, 2024
b1542dc
Implementation
begmaroman Aug 22, 2024
4835d9a
Updated E2E test
begmaroman Aug 22, 2024
9135a2d
Minor update
begmaroman Aug 22, 2024
6c0e817
Minor update
begmaroman Aug 22, 2024
fbcf805
Minor update
begmaroman Aug 22, 2024
b82d0a8
Minor update
begmaroman Aug 22, 2024
892d0d6
Minor update
begmaroman Aug 22, 2024
d88f723
Minor update
begmaroman Aug 22, 2024
c723407
Minor update
begmaroman Aug 22, 2024
fc38452
Minor update
begmaroman Aug 22, 2024
0c91e6f
Finalized reorg detector
begmaroman Aug 26, 2024
63857fd
Finalized reorg detector
begmaroman Aug 26, 2024
9e781f4
Fixed unit tests
begmaroman Aug 26, 2024
2237684
Fixed unit tests
begmaroman Aug 26, 2024
6bdba60
Fixed unit tests
begmaroman Aug 26, 2024
c982e79
Fixed unit tests
begmaroman Aug 26, 2024
effc6aa
Fixed unit tests
begmaroman Aug 26, 2024
5cb68f0
Fixed unit tests
begmaroman Aug 26, 2024
ca3b8f3
Added debug log
begmaroman Aug 26, 2024
569affb
Removed debug logs
begmaroman Aug 26, 2024
e864d36
Fixed e2e tests
begmaroman Aug 26, 2024
e57badb
Fixed e2e tests
begmaroman Aug 26, 2024
cd3d3ef
Fixed e2e tests
begmaroman Aug 26, 2024
3b9a15a
Address comments
begmaroman Aug 26, 2024
d0bc399
Address comments
begmaroman Aug 27, 2024
0b672f9
Address comments
begmaroman Aug 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 265 additions & 0 deletions reorgmonitor/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
package reorgmonitor

import (
"context"
"fmt"
"log"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"
)

type EthClient interface {
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
BlockNumber(ctx context.Context) (uint64, error)
}

type ReorgMonitor struct {
lock sync.Mutex

client EthClient

maxBlocksInCache int
lastBlockHeight uint64

newReorgChan chan<- *Reorg
knownReorgs map[string]uint64 // key: reorgId, value: endBlockNumber

blockByHash map[common.Hash]*Block
blocksByHeight map[uint64]map[common.Hash]*Block

EarliestBlockNumber uint64
LatestBlockNumber uint64
}

func NewReorgMonitor(client EthClient, reorgChan chan<- *Reorg, maxBlocks int) *ReorgMonitor {
return &ReorgMonitor{
client: client,
maxBlocksInCache: maxBlocks,
blockByHash: make(map[common.Hash]*Block),
blocksByHeight: make(map[uint64]map[common.Hash]*Block),
newReorgChan: reorgChan,
knownReorgs: make(map[string]uint64),
}
}

// AddBlockToTrack adds a block to the monitor, and sends it to the monitor's channel.
// After adding a new block, a reorg check takes place. If a new completed reorg is detected, it is sent to the channel.
func (mon *ReorgMonitor) AddBlockToTrack(block *Block) error {
mon.lock.Lock()
defer mon.lock.Unlock()

mon.addBlock(block)

// Do nothing if block is at previous height
if block.Number == mon.lastBlockHeight {
return nil
}

if len(mon.blocksByHeight) < 3 {
return nil
}

// Analyze blocks once a new height has been reached
mon.lastBlockHeight = block.Number

anal, err := mon.AnalyzeTree(0, 2)
begmaroman marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

for _, reorg := range anal.Reorgs {
if !reorg.IsFinished { // don't care about unfinished reorgs
continue
}

// Send new finished reorgs to channel
if _, isKnownReorg := mon.knownReorgs[reorg.Id()]; !isKnownReorg {
mon.knownReorgs[reorg.Id()] = reorg.EndBlockHeight
mon.newReorgChan <- reorg
}
}

return nil
}

// addBlock adds a block to history if it hasn't been seen before, and download unknown referenced blocks (parent, uncles).
func (mon *ReorgMonitor) addBlock(block *Block) bool {
defer mon.trimCache()

// If known, then only overwrite if known was by uncle
knownBlock, isKnown := mon.blockByHash[block.Hash]
if isKnown && knownBlock.Origin != OriginUncle {
return false
}

// Only accept blocks that are after the earliest known (some nodes might be further back)
if block.Number < mon.EarliestBlockNumber {
return false
}

// Print
blockInfo := fmt.Sprintf("Add%s \t %-12s \t %s", block.String(), block.Origin, mon)
log.Println(blockInfo)

// Add for access by hash
mon.blockByHash[block.Hash] = block

// Create array of blocks at this height, if necessary
if _, found := mon.blocksByHeight[block.Number]; !found {
mon.blocksByHeight[block.Number] = make(map[common.Hash]*Block)
}

// Add to map of blocks at this height
mon.blocksByHeight[block.Number][block.Hash] = block

// Set earliest block
if mon.EarliestBlockNumber == 0 || block.Number < mon.EarliestBlockNumber {
mon.EarliestBlockNumber = block.Number
}

// Set latest block
if block.Number > mon.LatestBlockNumber {
mon.LatestBlockNumber = block.Number
}

// Check if further blocks can be downloaded from this one
if block.Number > mon.EarliestBlockNumber { // check backhistory only if we are past the earliest block
err := mon.checkBlockForReferences(block)
if err != nil {
log.Println(err)
}
}

return true
}

func (mon *ReorgMonitor) trimCache() {
// Trim reorg history
for reorgId, reorgEndBlockheight := range mon.knownReorgs {
if reorgEndBlockheight < mon.EarliestBlockNumber {
delete(mon.knownReorgs, reorgId)
}
}

for currentHeight := mon.EarliestBlockNumber; currentHeight < mon.LatestBlockNumber; currentHeight++ {
blocks, heightExists := mon.blocksByHeight[currentHeight]
if !heightExists {
continue
}

// Set new lowest block number
mon.EarliestBlockNumber = currentHeight

// Stop if trimmed enough
if len(mon.blockByHash) <= mon.maxBlocksInCache {
return
}

// Trim
for hash := range blocks {
delete(mon.blocksByHeight[currentHeight], hash)
delete(mon.blockByHash, hash)
}
delete(mon.blocksByHeight, currentHeight)
}
}

func (mon *ReorgMonitor) checkBlockForReferences(block *Block) error {
// Check parent
_, found := mon.blockByHash[block.ParentHash]
if !found {
// fmt.Printf("- parent of %d %s not found (%s), downloading...\n", block.Number, block.Hash, block.ParentHash)
_, _, err := mon.ensureBlock(block.ParentHash, OriginGetParent)
if err != nil {
return errors.Wrap(err, "get-parent error")
}
}

// Check uncles
for _, uncleHeader := range block.Block.Uncles() {
// fmt.Printf("- block %d %s has uncle: %s\n", block.Number, block.Hash, uncleHeader.Hash())
_, _, err := mon.ensureBlock(uncleHeader.Hash(), OriginUncle)
if err != nil {
return errors.Wrap(err, "get-uncle error")
}
}

// ro.DebugPrintln(fmt.Sprintf("- added block %d %s", block.NumberU64(), block.Hash()))
return nil
}

func (mon *ReorgMonitor) ensureBlock(blockHash common.Hash, origin BlockOrigin) (block *Block, alreadyExisted bool, err error) {
// Check and potentially download block
var found bool
block, found = mon.blockByHash[blockHash]
if found {
return block, true, nil
}

fmt.Printf("- block %s (%s) not found, downloading from...\n", blockHash, origin)
ethBlock, err := mon.client.BlockByHash(context.Background(), blockHash)
if err != nil {
fmt.Println("- err block not found:", blockHash, err) // todo: try other clients
msg := fmt.Sprintf("EnsureBlock error for hash %s", blockHash)
return nil, false, errors.Wrap(err, msg)
}

block = NewBlock(ethBlock, origin)

// Add a new block without sending to channel, because that makes reorg.AddBlock() asynchronous,
// but we want reorg.AddBlock() to wait until all references are added.
mon.addBlock(block)

return block, false, nil
}

func (mon *ReorgMonitor) AnalyzeTree(maxBlocks, distanceToLastBlockHeight uint64) (*TreeAnalysis, error) {
// Set end height of search
endBlockNumber := mon.LatestBlockNumber - distanceToLastBlockHeight

// Set start height of search
startBlockNumber := mon.EarliestBlockNumber
if maxBlocks > 0 && endBlockNumber-maxBlocks > mon.EarliestBlockNumber {
startBlockNumber = endBlockNumber - maxBlocks
}

// Build tree datastructure
tree := NewBlockTree()
for height := startBlockNumber; height <= endBlockNumber; height++ {
numBlocksAtHeight := len(mon.blocksByHeight[height])
if numBlocksAtHeight == 0 {
err := fmt.Errorf("error in monitor.AnalyzeTree: no blocks at height %d", height)
return nil, err
}

// Start tree only when 1 block at this height. If more blocks then skip.
if tree.FirstNode == nil && numBlocksAtHeight > 1 {
continue
}

// Add all blocks at this height to the tree
for _, currentBlock := range mon.blocksByHeight[height] {
err := tree.AddBlock(currentBlock)
if err != nil {
return nil, errors.Wrap(err, "monitor.AnalyzeTree->tree.AddBlock error")
}
}
}

// Get analysis of tree
anal, err := NewTreeAnalysis(tree)
if err != nil {
return nil, errors.Wrap(err, "monitor.AnalyzeTree->NewTreeAnalysis error")
}

return anal, nil
}

func (mon *ReorgMonitor) String() string {
return fmt.Sprintf("ReorgMonitor: %d - %d, %d / %d blocks, %d reorgcache", mon.EarliestBlockNumber, mon.LatestBlockNumber, len(mon.blockByHash), len(mon.blocksByHeight), len(mon.knownReorgs))
}
110 changes: 110 additions & 0 deletions reorgmonitor/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package reorgmonitor

import (
"context"
"fmt"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient/simulated"
"github.com/stretchr/testify/require"
)

func newSimulatedL1(t *testing.T, auth *bind.TransactOpts) *simulated.Backend {
t.Helper()

balance, _ := new(big.Int).SetString("10000000000000000000000000", 10) //nolint:gomnd

blockGasLimit := uint64(999999999999999999) //nolint:gomnd
client := simulated.NewBackend(map[common.Address]types.Account{
auth.From: {
Balance: balance,
},
}, simulated.WithBlockGasLimit(blockGasLimit))
client.Commit()

return client
}

func Test_ReorgMonitor(t *testing.T) {
const produceBlocks = 29
const reorgPeriod = 5
const reorgDepth = 2

ctx := context.Background()

// Simulated L1
privateKeyL1, err := crypto.GenerateKey()
require.NoError(t, err)
authL1, err := bind.NewKeyedTransactorWithChainID(privateKeyL1, big.NewInt(1337))
require.NoError(t, err)
clientL1 := newSimulatedL1(t, authL1)
require.NoError(t, err)

reorgChan := make(chan *Reorg, 100)
mon := NewReorgMonitor(clientL1.Client(), reorgChan, 100)

// Add head tracker
ch := make(chan *types.Header, 100)
sub, err := clientL1.Client().SubscribeNewHead(ctx, ch)
require.NoError(t, err)
go func() {
for {
select {
case <-ctx.Done():
return
case <-sub.Err():
return
case header := <-ch:
block, err := clientL1.Client().BlockByNumber(ctx, header.Number)
require.NoError(t, err)

err = mon.AddBlockToTrack(NewBlock(block, OriginSubscription))
require.NoError(t, err)
}
}
}()

expectedReorgBlocks := make(map[uint64]struct{})
lastReorgOn := int64(0)
for i := 1; lastReorgOn <= produceBlocks; i++ {
block := clientL1.Commit()
time.Sleep(time.Millisecond)

header, err := clientL1.Client().HeaderByHash(ctx, block)
require.NoError(t, err)
headerNumber := header.Number.Int64()

// Reorg every "reorgPeriod" blocks with "reorgDepth" blocks depth
if headerNumber > lastReorgOn && headerNumber%reorgPeriod == 0 {
lastReorgOn = headerNumber

reorgBlock, err := clientL1.Client().BlockByNumber(ctx, big.NewInt(headerNumber-reorgDepth))
require.NoError(t, err)

fmt.Println("Forking from block", reorgBlock.Number(), "on block", headerNumber)
expectedReorgBlocks[reorgBlock.NumberU64()] = struct{}{}

err = clientL1.Fork(reorgBlock.Hash())
require.NoError(t, err)
}
}

// Commit some blocks to ensure reorgs are detected
for i := 0; i < reorgPeriod; i++ {
clientL1.Commit()
}

fmt.Println("Expected reorg blocks", expectedReorgBlocks)

for range expectedReorgBlocks {
reorg := <-reorgChan
_, ok := expectedReorgBlocks[reorg.StartBlockHeight-1]
require.True(t, ok, "unexpected reorg starting from", reorg.StartBlockHeight-1)
}
}
Loading
Loading