From 396258d9d0cb1e516ff5d2c9a16d092097dfd38e Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Thu, 18 Jul 2024 16:12:12 +0200 Subject: [PATCH 01/11] feat: reorg detector --- common/common.go | 16 + localbridgesync/downloader_test.go | 5 +- localbridgesync/processor.go | 27 +- reorgdetector/mock_eth_client.go | 89 ++++++ reorgdetector/reorgdetector.go | 440 ++++++++++++++++++++++------ reorgdetector/reorgdetector_test.go | 228 ++++++++++++++ test/Makefile | 12 +- 7 files changed, 713 insertions(+), 104 deletions(-) create mode 100644 common/common.go create mode 100644 reorgdetector/mock_eth_client.go create mode 100644 reorgdetector/reorgdetector_test.go diff --git a/common/common.go b/common/common.go new file mode 100644 index 00000000..e211a335 --- /dev/null +++ b/common/common.go @@ -0,0 +1,16 @@ +package common + +import "encoding/binary" + +// BlockNum2Bytes converts a block number to a byte slice +func BlockNum2Bytes(blockNum uint64) []byte { + key := make([]byte, 8) + binary.LittleEndian.PutUint64(key, blockNum) + + return key +} + +// Bytes2BlockNum converts a byte slice to a block number +func Bytes2BlockNum(key []byte) uint64 { + return binary.LittleEndian.Uint64(key) +} diff --git a/localbridgesync/downloader_test.go b/localbridgesync/downloader_test.go index c60f4d3f..553efbec 100644 --- a/localbridgesync/downloader_test.go +++ b/localbridgesync/downloader_test.go @@ -9,6 +9,7 @@ import ( "github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonzkevmbridge" "github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonzkevmbridgev2" + cdkcommon "github.com/0xPolygon/cdk/common" "github.com/0xPolygon/cdk/etherman" "github.com/0xPolygon/cdk/log" "github.com/ethereum/go-ethereum" @@ -203,7 +204,7 @@ func generateBridge(t *testing.T, blockNum uint32) (*types.Log, Bridge) { log := &types.Log{ Address: contractAddr, BlockNumber: uint64(blockNum), - BlockHash: common.BytesToHash(blockNum2Bytes(uint64(blockNum))), + BlockHash: common.BytesToHash(cdkcommon.BlockNum2Bytes(uint64(blockNum))), Topics: []common.Hash{bridgeEventSignature}, Data: data, } @@ -262,7 +263,7 @@ func generateClaim(t *testing.T, blockNum uint32, event *abi.Event, isV1 bool) ( log := &types.Log{ Address: contractAddr, BlockNumber: uint64(blockNum), - BlockHash: common.BytesToHash(blockNum2Bytes(uint64(blockNum))), + BlockHash: common.BytesToHash(cdkcommon.BlockNum2Bytes(uint64(blockNum))), Topics: []common.Hash{signature}, Data: data, } diff --git a/localbridgesync/processor.go b/localbridgesync/processor.go index 2e6f27a0..69c885b8 100644 --- a/localbridgesync/processor.go +++ b/localbridgesync/processor.go @@ -2,10 +2,10 @@ package localbridgesync import ( "context" - "encoding/binary" "encoding/json" "errors" + "github.com/0xPolygon/cdk/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" ) @@ -57,6 +57,9 @@ func (p *processor) GetClaimsAndBridges( } defer tx.Rollback() lpb, err := p.getLastProcessedBlockWithTx(tx) + if err != nil { + return nil, err + } if lpb < toBlock { return nil, ErrBlockNotProcessed } @@ -66,11 +69,11 @@ func (p *processor) GetClaimsAndBridges( } defer c.Close() - for k, v, err := c.Seek(blockNum2Bytes(fromBlock)); k != nil; k, v, err = c.Next() { + for k, v, err := c.Seek(common.BlockNum2Bytes(fromBlock)); k != nil; k, v, err = c.Next() { if err != nil { return nil, err } - if bytes2BlockNum(k) > toBlock { + if common.Bytes2BlockNum(k) > toBlock { break } blockEvents := []BridgeEvent{} @@ -99,7 +102,7 @@ func (p *processor) getLastProcessedBlockWithTx(tx kv.Tx) (uint64, error) { } else if blockNumBytes == nil { return 0, nil } else { - return bytes2BlockNum(blockNumBytes), nil + return common.Bytes2BlockNum(blockNumBytes), nil } } @@ -113,7 +116,7 @@ func (p *processor) reorg(firstReorgedBlock uint64) error { return err } defer c.Close() - firstKey := blockNum2Bytes(firstReorgedBlock) + firstKey := common.BlockNum2Bytes(firstReorgedBlock) for k, _, err := c.Seek(firstKey); k != nil; k, _, err = c.Next() { if err != nil { tx.Rollback() @@ -142,7 +145,7 @@ func (p *processor) storeBridgeEvents(blockNum uint64, events []BridgeEvent) err tx.Rollback() return err } - if err := tx.Put(eventsTable, blockNum2Bytes(blockNum), value); err != nil { + if err := tx.Put(eventsTable, common.BlockNum2Bytes(blockNum), value); err != nil { tx.Rollback() return err } @@ -155,16 +158,6 @@ func (p *processor) storeBridgeEvents(blockNum uint64, events []BridgeEvent) err } func (p *processor) updateLastProcessedBlock(tx kv.RwTx, blockNum uint64) error { - blockNumBytes := blockNum2Bytes(blockNum) + blockNumBytes := common.BlockNum2Bytes(blockNum) return tx.Put(lastBlockTable, lastBlokcKey, blockNumBytes) } - -func blockNum2Bytes(blockNum uint64) []byte { - key := make([]byte, 8) - binary.LittleEndian.PutUint64(key, blockNum) - return key -} - -func bytes2BlockNum(key []byte) uint64 { - return binary.LittleEndian.Uint64(key) -} diff --git a/reorgdetector/mock_eth_client.go b/reorgdetector/mock_eth_client.go new file mode 100644 index 00000000..add883f6 --- /dev/null +++ b/reorgdetector/mock_eth_client.go @@ -0,0 +1,89 @@ +// Code generated by mockery v2.42.1. DO NOT EDIT. + +package reorgdetector + +import ( + context "context" + big "math/big" + + mock "github.com/stretchr/testify/mock" + + types "github.com/ethereum/go-ethereum/core/types" +) + +// EthClientMock is an autogenerated mock type for the EthClient type +type EthClientMock struct { + mock.Mock +} + +// BlockNumber provides a mock function with given fields: ctx +func (_m *EthClientMock) BlockNumber(ctx context.Context) (uint64, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for BlockNumber") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) uint64); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// HeaderByNumber provides a mock function with given fields: ctx, number +func (_m *EthClientMock) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + ret := _m.Called(ctx, number) + + if len(ret) == 0 { + panic("no return value specified for HeaderByNumber") + } + + var r0 *types.Header + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Header, error)); ok { + return rf(ctx, number) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *types.Header); ok { + r0 = rf(ctx, number) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Header) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, number) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewEthClientMock creates a new instance of EthClientMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewEthClientMock(t interface { + mock.TestingT + Cleanup(func()) +}) *EthClientMock { + mock := &EthClientMock{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 06d15e85..c59d6dce 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -2,14 +2,19 @@ package reorgdetector import ( "context" + "encoding/json" "errors" "math/big" + "sort" "sync" "time" + "github.com/0xPolygon/cdk/log" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" ) // TODO: consider the case where blocks can disappear, current implementation assumes that if there is a reorg, @@ -18,62 +23,179 @@ import ( const ( waitPeriodBlockRemover = time.Second * 20 waitPeriodBlockAdder = time.Second * 2 // should be smaller than block time of the tracked chain + + subscriberBlocks = "reorgdetector-subscriberBlocks" + + unfalisedBlocksID = "unfinalisedBlocks" ) var ( ErrNotSubscribed = errors.New("id not found in subscriptions") ErrInvalidBlockHash = errors.New("the block hash does not match with the expected block hash") + ErrIDReserverd = errors.New("subscription id is reserved") ) +func tableCfgFunc(defaultBuckets kv.TableCfg) kv.TableCfg { + return kv.TableCfg{ + subscriberBlocks: {}, + } +} + +type EthClient interface { + HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + BlockNumber(ctx context.Context) (uint64, error) +} + +type block struct { + Num uint64 + Hash common.Hash +} + +type blockMap map[uint64]block + +// newBlockMap returns a new instance of blockMap +func newBlockMap(blocks ...block) blockMap { + blockMap := make(blockMap, len(blocks)) + + for _, b := range blocks { + blockMap[b.Num] = b + } + + return blockMap +} + +// getSorted returns blocks in sorted order +func (bm blockMap) getSorted() []block { + sortedBlocks := make([]block, 0, len(bm)) + + for _, b := range bm { + sortedBlocks = append(sortedBlocks, b) + } + + sort.Slice(sortedBlocks, func(i, j int) bool { + return sortedBlocks[i].Num < sortedBlocks[j].Num + }) + + return sortedBlocks +} + +// getFromBlockSorted returns blocks from blockNum in sorted order without including the blockNum +func (bm blockMap) getFromBlockSorted(blockNum uint64) []block { + sortedBlocks := bm.getSorted() + numOfBlocks := len(sortedBlocks) + lastBlock := sortedBlocks[numOfBlocks-1].Num + + if blockNum < lastBlock { + numOfBlocksToLeave := int(lastBlock - blockNum) + + if numOfBlocksToLeave > numOfBlocks { + numOfBlocksToLeave %= numOfBlocks + } + + newBlocks := make([]block, 0, numOfBlocksToLeave) + for i := numOfBlocks - numOfBlocksToLeave; i < numOfBlocks; i++ { + if sortedBlocks[i].Num < lastBlock { + // skip blocks that are finalised + continue + } + + newBlocks = append(newBlocks, sortedBlocks[i]) + } + + sortedBlocks = newBlocks + } else { + sortedBlocks = []block{} + } + + return sortedBlocks +} + type Subscription struct { FirstReorgedBlock chan uint64 ReorgProcessed chan bool - mu sync.Mutex pendingReorgsToBeProcessed *sync.WaitGroup } type ReorgDetector struct { - ethClient *ethclient.Client - mu sync.Mutex - unfinalisedBlocks map[uint64]common.Hash - trackedBlocks map[string]map[uint64]common.Hash // TODO: needs persistance! needs to be able to iterate in order! - // the channel is used to notify first invalid block + ethClient EthClient subscriptions map[string]*Subscription + + trackedBlocksLock sync.RWMutex + trackedBlocks map[string]blockMap + + db kv.RwDB } -func New(ctx context.Context) (*ReorgDetector, error) { - r := &ReorgDetector{} - lastFinalisedBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) +// New creates a new instance of ReorgDetector +func New(ctx context.Context, client EthClient, dbPath string) (*ReorgDetector, error) { + db, err := mdbx.NewMDBX(nil). + Path(dbPath). + WithTableCfg(tableCfgFunc). + Open() if err != nil { return nil, err } - r.unfinalisedBlocks[lastFinalisedBlock.Number.Uint64()] = lastFinalisedBlock.Hash() - err = r.cleanStoredSubsBeforeStart(ctx, lastFinalisedBlock.Number.Uint64()) + + return newReorgDetector(ctx, client, db) +} + +// newReorgDetector creates a new instance of ReorgDetector +func newReorgDetector(ctx context.Context, client EthClient, db kv.RwDB) (*ReorgDetector, error) { + r := &ReorgDetector{ + ethClient: client, + db: db, + subscriptions: make(map[string]*Subscription, 0), + } + + trackedBlocks, err := r.getTrackedBlocks(ctx) if err != nil { return nil, err } + + r.trackedBlocks = trackedBlocks + + lastFinalisedBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) + if err != nil { + return nil, err + } + + if err = r.cleanStoredSubsBeforeStart(ctx, lastFinalisedBlock.Number.Uint64()); err != nil { + return nil, err + } + return r, nil } func (r *ReorgDetector) Start(ctx context.Context) { - var lastFinalisedBlock uint64 - for lastFinalisedBlock = range r.unfinalisedBlocks { + unfinalisedBlocks, ok := r.trackedBlocks[unfalisedBlocksID] + if !ok { + unfinalisedBlocks = newBlockMap() } + + var lastUnfinalisedBlock uint64 + if len(unfinalisedBlocks) > 0 { + lastUnfinalisedBlock = unfinalisedBlocks.getSorted()[len(unfinalisedBlocks)-1].Num + } + go r.removeFinalisedBlocks(ctx) - go r.addUnfinalisedBlocks(ctx, lastFinalisedBlock+1) + go r.addUnfinalisedBlocks(ctx, lastUnfinalisedBlock+1) } -func (r *ReorgDetector) Subscribe(id string) *Subscription { +func (r *ReorgDetector) Subscribe(id string) (*Subscription, error) { + if id == unfalisedBlocksID { + return nil, ErrIDReserverd + } + if sub, ok := r.subscriptions[id]; ok { - return sub + return sub, nil } sub := &Subscription{ FirstReorgedBlock: make(chan uint64), ReorgProcessed: make(chan bool), } r.subscriptions[id] = sub - r.trackedBlocks[id] = make(map[uint64]common.Hash) - return sub + + return sub, nil } func (r *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, blockNum uint64, blockHash common.Hash) error { @@ -82,14 +204,12 @@ func (r *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, blockNum } else { // In case there are reorgs being processed, wait // Note that this also makes any addition to trackedBlocks[id] safe - sub.mu.Lock() - defer sub.mu.Unlock() sub.pendingReorgsToBeProcessed.Wait() } - if actualHash, ok := r.unfinalisedBlocks[blockNum]; ok { - if actualHash == blockHash { - r.trackedBlocks[id][blockNum] = blockHash - return nil + + if actualHash, ok := r.getUnfinalisedBlocksMap()[blockNum]; ok { + if actualHash.Hash == blockHash { + return r.saveTrackedBlock(ctx, id, block{Num: blockNum, Hash: blockHash}) } else { return ErrInvalidBlockHash } @@ -104,67 +224,89 @@ func (r *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, blockNum return nil } else { // ReorgDetector has not added the requested block yet, adding it - r.trackedBlocks[id][blockNum] = blockHash - return nil + return r.saveTrackedBlock(ctx, id, block{Num: blockNum, Hash: blockHash}) } } } func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFinalisedBlock uint64) error { - for id := range r.trackedBlocks { - sub := &Subscription{ + blocksGotten := make(map[uint64]common.Hash, 0) + + r.trackedBlocksLock.Lock() + defer r.trackedBlocksLock.Unlock() + + for id, blocks := range r.trackedBlocks { + r.subscriptions[id] = &Subscription{ FirstReorgedBlock: make(chan uint64), ReorgProcessed: make(chan bool), } - r.subscriptions[id] = sub - if err := r.cleanStoredSubBeforeStart(ctx, id, latestFinalisedBlock); err != nil { - return err + + var ( + lastTrackedBlock uint64 + block block + actualBlockHash common.Hash + ok bool + ) + + if len(blocks) == 0 { + continue // nothing to process for this subscriber } - } - return nil -} -func (r *ReorgDetector) cleanStoredSubBeforeStart(ctx context.Context, id string, latestFinalisedBlock uint64) error { - blocks := r.trackedBlocks[id] - var lastTrackedBlock uint64 // TODO: get the greatest block num tracked - for expectedBlockNum, expectedBlockHash := range blocks { // should iterate in order - actualBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(expectedBlockNum))) - if err != nil { - return err + sortedBlocks := blocks.getSorted() + lastTrackedBlock = sortedBlocks[len(blocks)-1].Num + + for _, block = range blocks { + if actualBlockHash, ok = blocksGotten[block.Num]; !ok { + actualBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(block.Num))) + if err != nil { + return err + } + + actualBlockHash = actualBlock.Hash() + } + + if actualBlockHash != block.Hash { + // reorg detected, notify subscriber + if id != unfalisedBlocksID { + r.subscriptions[id].pendingReorgsToBeProcessed.Add(1) + go r.notifyReorgToSubscription(id, block.Num, lastTrackedBlock) + } + + break + } else if block.Num <= latestFinalisedBlock { + delete(blocks, block.Num) + } } - if actualBlock.Hash() != expectedBlockHash { - r.subscriptions[id].pendingReorgsToBeProcessed.Add(1) - go r.notifyReorgToSubscription(id, expectedBlockNum, lastTrackedBlock) - return nil - } else if expectedBlockNum < latestFinalisedBlock { - delete(blocks, expectedBlockNum) + + if err := r.updateTrackedBlocksNoLock(ctx, id, blocks); err != nil { + return err } } + return nil } func (r *ReorgDetector) removeFinalisedBlocks(ctx context.Context) { + ticker := time.NewTicker(waitPeriodBlockRemover) + for { - lastFinalisedBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) - if err != nil { - // TODO: handle error - return - } - for i := lastFinalisedBlock.Number.Uint64(); i >= 0; i-- { - if _, ok := r.unfinalisedBlocks[i]; ok { - r.mu.Lock() - delete(r.unfinalisedBlocks, i) - r.mu.Unlock() - } else { - break + select { + case <-ticker.C: + lastFinalisedBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) + if err != nil { + log.Error("reorg detector - error getting last finalised block", "err", err) + + continue } - for id, blocks := range r.trackedBlocks { - r.subscriptions[id].mu.Lock() - delete(blocks, i) - r.subscriptions[id].mu.Unlock() + + if err := r.removeTrackedBlocks(ctx, lastFinalisedBlock.Number.Uint64()); err != nil { + log.Error("reorg detector - error removing tracked blocks", "err", err) + + continue } + case <-ctx.Done(): + return } - time.Sleep(waitPeriodBlockRemover) } } @@ -179,39 +321,40 @@ func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context, initBlock uint for currentBlock > lastBlockFromClient { lastBlockFromClient, err = r.ethClient.BlockNumber(ctx) if err != nil { - // TODO: handle error - return + log.Error("reorg detector - error getting last block number from client", "err", err) + time.Sleep(waitPeriodBlockAdder) + + continue } + time.Sleep(waitPeriodBlockAdder) } - block, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(currentBlock))) + + header, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(currentBlock))) if err != nil { - // TODO: handle error - return + log.Error("reorg detector - error getting block header from client", "err", err) } - prevBlockHash, ok := r.unfinalisedBlocks[currentBlock-1] - if !ok || block.ParentHash == prevBlockHash { - // previous block is correct or there is no previous block, add current block - r.mu.Lock() - r.unfinalisedBlocks[currentBlock] = block.Hash() - r.mu.Unlock() + + unfinalisedBlocks := r.getUnfinalisedBlocksMap() + + prevBlock, ok := unfinalisedBlocks[currentBlock-1] + if !ok || header.ParentHash == prevBlock.Hash { + r.saveTrackedBlock(ctx, unfalisedBlocksID, block{header.Number.Uint64(), header.Hash()}) if firstBlockReorged > 0 { r.notifyReorg(currentBlock, firstBlockReorged) } currentBlock++ firstBlockReorged = 0 - } else { + } else if header.ParentHash != prevBlock.Hash { // previous block is reorged: // 1. add a pending reorg to be processed for all the subscribers (so they don't add more blocks) // 2. remove block for _, sub := range r.subscriptions { - sub.mu.Lock() sub.pendingReorgsToBeProcessed.Add(1) - sub.mu.Unlock() } - r.mu.Lock() - delete(r.unfinalisedBlocks, currentBlock-1) - r.mu.Unlock() + + r.removeTrackedBlock(ctx, unfalisedBlocksID, currentBlock-1) + currentBlock-- if firstBlockReorged == 0 { firstBlockReorged = currentBlock @@ -227,7 +370,10 @@ func (r *ReorgDetector) notifyReorg(fromBlock, toBlock uint64) { } func (r *ReorgDetector) notifyReorgToSubscription(id string, fromBlock, toBlock uint64) { + r.trackedBlocksLock.RLock() blocks := r.trackedBlocks[id] + r.trackedBlocksLock.RUnlock() + sub := r.subscriptions[id] var found bool for i := fromBlock; i <= toBlock; i++ { @@ -242,5 +388,135 @@ func (r *ReorgDetector) notifyReorgToSubscription(id string, fromBlock, toBlock delete(blocks, i) } } + sub.pendingReorgsToBeProcessed.Done() } + +// getUnfinalisedBlocksMap returns the map of unfinalised blocks +func (r *ReorgDetector) getUnfinalisedBlocksMap() blockMap { + r.trackedBlocksLock.RLock() + defer r.trackedBlocksLock.RUnlock() + + return r.trackedBlocks[unfalisedBlocksID] +} + +// getTrackedBlocks returns a list of tracked blocks for each subscriber from db +func (r *ReorgDetector) getTrackedBlocks(ctx context.Context) (map[string]blockMap, error) { + tx, err := r.db.BeginRo(ctx) + if err != nil { + return nil, err + } + + defer tx.Rollback() + + cursor, err := tx.Cursor(subscriberBlocks) + if err != nil { + return nil, err + } + + defer cursor.Close() + + trackedBlocks := make(map[string]blockMap, 0) + + for k, v, err := cursor.First(); k != nil; k, v, err = cursor.Next() { + if err != nil { + return nil, err + } + + var blocks []block + if err := json.Unmarshal(v, &blocks); err != nil { + return nil, err + } + + trackedBlocks[string(k)] = newBlockMap(blocks...) + } + + if _, ok := trackedBlocks[unfalisedBlocksID]; !ok { + // add unfinalised blocks to tracked blocks map if not present in db + trackedBlocks[unfalisedBlocksID] = newBlockMap() + } + + return trackedBlocks, nil +} + +// saveTrackedBlock saves the tracked block for a subscriber in db and in memory +func (r *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b block) error { + tx, err := r.db.BeginRw(ctx) + if err != nil { + return err + } + + defer tx.Rollback() + + r.trackedBlocksLock.Lock() + + subscriberBlockMap, ok := r.trackedBlocks[id] + if !ok { + subscriberBlockMap = newBlockMap(b) + r.trackedBlocks[id] = subscriberBlockMap + } + + r.trackedBlocksLock.Unlock() + + raw, err := json.Marshal(subscriberBlockMap.getSorted()) + if err != nil { + + return err + } + + return tx.Put(subscriberBlocks, []byte(id), raw) +} + +// removeTrackedBlock removes the tracked block for a subscriber in db and in memory +func (r *ReorgDetector) removeTrackedBlock(ctx context.Context, id string, blockNum uint64) error { + r.trackedBlocksLock.Lock() + defer r.trackedBlocksLock.Unlock() + + subscriberBlockMap, ok := r.trackedBlocks[id] + if !ok { + return nil + } + + delete(subscriberBlockMap, blockNum) + + return r.updateTrackedBlocksNoLock(ctx, id, subscriberBlockMap) +} + +// removeTrackedBlocks removes the tracked blocks for a subscriber in db and in memory +func (r *ReorgDetector) removeTrackedBlocks(ctx context.Context, lastFinalizedBlock uint64) error { + r.trackedBlocksLock.Lock() + defer r.trackedBlocksLock.Unlock() + + for id := range r.trackedBlocks { + newTrackedBlocks := r.trackedBlocks[id].getFromBlockSorted(lastFinalizedBlock) + + if err := r.updateTrackedBlocksNoLock(ctx, id, newBlockMap(newTrackedBlocks...)); err != nil { + return err + } + } + + return nil +} + +// updateTrackedBlocksNoLock updates the tracked blocks for a subscriber in db and in memory +func (r *ReorgDetector) updateTrackedBlocksNoLock(ctx context.Context, id string, blocks blockMap) error { + tx, err := r.db.BeginRw(ctx) + if err != nil { + return err + } + + defer tx.Rollback() + + raw, err := json.Marshal(blocks.getSorted()) + if err != nil { + return err + } + + if err := tx.Put(subscriberBlocks, []byte(id), raw); err != nil { + return err + } + + r.trackedBlocks[id] = blocks + + return nil +} diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go new file mode 100644 index 00000000..77f546e3 --- /dev/null +++ b/reorgdetector/reorgdetector_test.go @@ -0,0 +1,228 @@ +package reorgdetector + +import ( + "context" + "encoding/json" + "errors" + "fmt" + big "math/big" + "os" + "reflect" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + types "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" + "github.com/stretchr/testify/require" +) + +const testSubscriber = "testSubscriber" + +// newTestDB creates new instance of db used by tests. +func newTestDB(tb testing.TB) kv.RwDB { + tb.Helper() + + dir := fmt.Sprintf("/tmp/reorgdetector-temp_%v", time.Now().UTC().Format(time.RFC3339Nano)) + err := os.Mkdir(dir, 0775) + + if err != nil { + tb.Fatal(err) + } + + db, err := mdbx.NewMDBX(nil). + Path(dir). + WithTableCfg(tableCfgFunc). + Open() + if err != nil { + tb.Fatal(err) + } + + tb.Cleanup(func() { + if err := os.RemoveAll(dir); err != nil { + tb.Fatal(err) + } + }) + + return db +} + +func TestBlockMap(t *testing.T) { + // Create a new block map + bm := newBlockMap( + block{Num: 1, Hash: common.HexToHash("0x123")}, + block{Num: 2, Hash: common.HexToHash("0x456")}, + block{Num: 3, Hash: common.HexToHash("0x789")}, + ) + + // Test getSorted function + sortedBlocks := bm.getSorted() + expectedSortedBlocks := []block{ + {Num: 1, Hash: common.HexToHash("0x123")}, + {Num: 2, Hash: common.HexToHash("0x456")}, + {Num: 3, Hash: common.HexToHash("0x789")}, + } + if !reflect.DeepEqual(sortedBlocks, expectedSortedBlocks) { + t.Errorf("getSorted() returned incorrect result, expected: %v, got: %v", expectedSortedBlocks, sortedBlocks) + } + + // Test getFromBlockSorted function + fromBlockSorted := bm.getFromBlockSorted(2) + expectedFromBlockSorted := []block{ + {Num: 3, Hash: common.HexToHash("0x789")}, + } + if !reflect.DeepEqual(fromBlockSorted, expectedFromBlockSorted) { + t.Errorf("getFromBlockSorted() returned incorrect result, expected: %v, got: %v", expectedFromBlockSorted, fromBlockSorted) + } + + // Test getFromBlockSorted function when blockNum is greater than the last block + fromBlockSorted = bm.getFromBlockSorted(4) + expectedFromBlockSorted = []block{} + if !reflect.DeepEqual(fromBlockSorted, expectedFromBlockSorted) { + t.Errorf("getFromBlockSorted() returned incorrect result, expected: %v, got: %v", expectedFromBlockSorted, fromBlockSorted) + } +} +func TestReorgDetector_New(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("first initialization, no data", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(100)}, nil, + ) + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + require.Len(t, rd.trackedBlocks, 1) + + unfinalisedBlocksMap, exists := rd.trackedBlocks[unfalisedBlocksID] + require.True(t, exists) + require.Empty(t, unfinalisedBlocksMap) + }) + + t.Run("getting last finalized block failed", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + expectedErr := errors.New("some error") + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return(nil, expectedErr) + + _, err := newReorgDetector(ctx, client, db) + require.ErrorIs(t, err, expectedErr) + }) + + t.Run("have tracked blocks and subscriptions no reorg - all blocks finalized", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + testBlocks := createTestBlocks(t, 1, 6) + unfinalisedBlocks := testBlocks[:5] + testSubscriberBlocks := testBlocks[:3] + + insertTestData(t, ctx, db, unfinalisedBlocks, unfalisedBlocksID) + insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) + + for _, block := range unfinalisedBlocks { + client.On("HeaderByNumber", ctx, block.Number).Return( + block, nil, + ) + } + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + testBlocks[len(testBlocks)-1], nil, + ) + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + require.Len(t, rd.trackedBlocks, 2) // testSubscriber and unfinalisedBlocks + + unfinalisedBlocksMap, exists := rd.trackedBlocks[unfalisedBlocksID] + require.True(t, exists) + require.Len(t, unfinalisedBlocksMap, 0) // since all blocks are finalized + + testSubscriberMap, exists := rd.trackedBlocks[testSubscriber] + require.True(t, exists) + require.Len(t, testSubscriberMap, 0) // since all blocks are finalized + }) + + t.Run("have tracked blocks and subscriptions no reorg - not all blocks finalized", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + testBlocks := createTestBlocks(t, 1, 7) + unfinalisedBlocks := testBlocks[:6] + testSubscriberBlocks := testBlocks[:4] + + insertTestData(t, ctx, db, unfinalisedBlocks, unfalisedBlocksID) + insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) + + for _, block := range unfinalisedBlocks { + client.On("HeaderByNumber", ctx, block.Number).Return( + block, nil, + ) + } + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + testSubscriberBlocks[len(testSubscriberBlocks)-1], nil, + ) + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + require.Len(t, rd.trackedBlocks, 2) // testSubscriber and unfinalisedBlocks + + unfinalisedBlocksMap, exists := rd.trackedBlocks[unfalisedBlocksID] + require.True(t, exists) + require.Len(t, unfinalisedBlocksMap, len(unfinalisedBlocks)-len(testSubscriberBlocks)) // since all blocks are finalized + + testSubscriberMap, exists := rd.trackedBlocks[testSubscriber] + require.True(t, exists) + require.Len(t, testSubscriberMap, 0) // since all blocks are finalized + }) +} + +func createTestBlocks(t *testing.T, startBlock uint64, count uint64) []*types.Header { + t.Helper() + + blocks := make([]*types.Header, 0, count) + for i := startBlock; i < startBlock+count; i++ { + blocks = append(blocks, &types.Header{Number: big.NewInt(int64(i))}) + } + + return blocks +} + +func insertTestData(t *testing.T, ctx context.Context, db kv.RwDB, blocks []*types.Header, id string) { + t.Helper() + + // Insert some test data + err := db.Update(ctx, func(tx kv.RwTx) error { + + blockMap := newBlockMap() + for _, b := range blocks { + blockMap[b.Number.Uint64()] = block{b.Number.Uint64(), b.Hash()} + } + + raw, err := json.Marshal(blockMap.getSorted()) + if err != nil { + return err + } + + return tx.Put(subscriberBlocks, []byte(id), raw) + }) + + require.NoError(t, err) +} diff --git a/test/Makefile b/test/Makefile index 68b245f6..817e042b 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,9 +1,15 @@ .PHONY: generate-mocks -generate-mocks: generate-mocks-localbridgesync +generate-mocks: + $(MAKE) generate-mocks-localbridgesync + $(MAKE) generate-mocks-reorgdetector .PHONY: generate-mocks-localbridgesync -generate-mocks-localbridgesync: ## Generates mocks for localbridgesync , using mockery tool +generate-mocks-localbridgesync: ## Generates mocks for localbridgesync, using mockery tool export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthClienter --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=L2Mock --filename=mock_l2_test.go export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=downloaderFull --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=DownloaderMock --filename=mock_downloader_test.go export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=processorInterface --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=ProcessorMock --filename=mock_processor_test.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ReorgDetector --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=ReorgDetectorMock --filename=mock_reorgdetector_test.go \ No newline at end of file + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ReorgDetector --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=ReorgDetectorMock --filename=mock_reorgdetector_test.go + +.PHONY: generate-mocks-reorgdetector +generate-mocks-reorgdetector: ## Generates mocks for reorgdetector, using mockery tool + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthClient --dir=../reorgdetector --output=../reorgdetector --outpkg=reorgdetector --inpackage --structname=EthClientMock --filename=mock_eth_client.go \ No newline at end of file From 94a3f4cc13b114347427fb36c1d9dc6edcbcafdc Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Fri, 19 Jul 2024 17:49:05 +0200 Subject: [PATCH 02/11] fix: addUnfinilisedBlocks --- reorgdetector/reorgdetector.go | 183 ++++++++++++++++++--------------- 1 file changed, 98 insertions(+), 85 deletions(-) diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index c59d6dce..f8ec9c82 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -110,6 +110,27 @@ func (bm blockMap) getFromBlockSorted(blockNum uint64) []block { return sortedBlocks } +// getClosestHigherBlock returns the closest higher block to the given blockNum +func (bm blockMap) getClosestHigherBlock(blockNum uint64) block { + if block, ok := bm[blockNum]; ok { + return block + } + + sorted := bm.getFromBlockSorted(blockNum) + if len(sorted) == 0 { + return block{} + } + + return sorted[0] +} + +// removeRange removes blocks from from to to +func (bm blockMap) removeRange(from, to uint64) { + for i := from; i <= to; i++ { + delete(bm, i) + } +} + type Subscription struct { FirstReorgedBlock chan uint64 ReorgProcessed chan bool @@ -167,18 +188,8 @@ func newReorgDetector(ctx context.Context, client EthClient, db kv.RwDB) (*Reorg } func (r *ReorgDetector) Start(ctx context.Context) { - unfinalisedBlocks, ok := r.trackedBlocks[unfalisedBlocksID] - if !ok { - unfinalisedBlocks = newBlockMap() - } - - var lastUnfinalisedBlock uint64 - if len(unfinalisedBlocks) > 0 { - lastUnfinalisedBlock = unfinalisedBlocks.getSorted()[len(unfinalisedBlocks)-1].Num - } - go r.removeFinalisedBlocks(ctx) - go r.addUnfinalisedBlocks(ctx, lastUnfinalisedBlock+1) + go r.addUnfinalisedBlocks(ctx) } func (r *ReorgDetector) Subscribe(id string) (*Subscription, error) { @@ -267,10 +278,10 @@ func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFi if actualBlockHash != block.Hash { // reorg detected, notify subscriber - if id != unfalisedBlocksID { - r.subscriptions[id].pendingReorgsToBeProcessed.Add(1) - go r.notifyReorgToSubscription(id, block.Num, lastTrackedBlock) - } + go r.notifyReorgToSubscription(id, block.Num) + + // remove the reorged blocks from the tracked blocks + blocks.removeRange(block.Num, lastTrackedBlock) break } else if block.Num <= latestFinalisedBlock { @@ -278,6 +289,7 @@ func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFi } } + // if we processed finalized or reorged blocks, update the tracked blocks in memory and db if err := r.updateTrackedBlocksNoLock(ctx, id, blocks); err != nil { return err } @@ -310,84 +322,92 @@ func (r *ReorgDetector) removeFinalisedBlocks(ctx context.Context) { } } -func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context, initBlock uint64) { +func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) { var ( - firstBlockReorged uint64 - err error + lastUnfinalisedBlock uint64 + ticker = time.NewTicker(waitPeriodBlockAdder) + unfinalisedBlocksMap = r.getUnfinalisedBlocksMap() ) - currentBlock := initBlock - lastBlockFromClient := currentBlock - 1 + + if len(unfinalisedBlocksMap) > 0 { + lastUnfinalisedBlock = unfinalisedBlocksMap.getSorted()[len(unfinalisedBlocksMap)-1].Num + } + for { - for currentBlock > lastBlockFromClient { - lastBlockFromClient, err = r.ethClient.BlockNumber(ctx) + select { + case <-ticker.C: + lastBlockFromClient, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.LatestBlockNumber))) if err != nil { - log.Error("reorg detector - error getting last block number from client", "err", err) - time.Sleep(waitPeriodBlockAdder) - + log.Error("reorg detector - error getting last block from client", "err", err) continue } - time.Sleep(waitPeriodBlockAdder) - } - - header, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(currentBlock))) - if err != nil { - log.Error("reorg detector - error getting block header from client", "err", err) - } + if lastBlockFromClient.Number.Uint64() < lastUnfinalisedBlock { + // a reorg probably happened, and the client has less blocks than we have + // we should wait for the client to catch up so we can be sure + continue + } - unfinalisedBlocks := r.getUnfinalisedBlocksMap() + unfinalisedBlocksMap = r.getUnfinalisedBlocksMap() + if len(unfinalisedBlocksMap) == 0 { + // no unfinalised blocks, just add this block to the map + if err := r.saveTrackedBlock(ctx, unfalisedBlocksID, block{ + Num: lastBlockFromClient.Number.Uint64(), + Hash: lastBlockFromClient.Hash(), + }); err != nil { + log.Error("reorg detector - error saving unfinalised block", "block", lastBlockFromClient.Number.Uint64(), "err", err) + } - prevBlock, ok := unfinalisedBlocks[currentBlock-1] - if !ok || header.ParentHash == prevBlock.Hash { - r.saveTrackedBlock(ctx, unfalisedBlocksID, block{header.Number.Uint64(), header.Hash()}) - if firstBlockReorged > 0 { - r.notifyReorg(currentBlock, firstBlockReorged) - } - currentBlock++ - firstBlockReorged = 0 - } else if header.ParentHash != prevBlock.Hash { - // previous block is reorged: - // 1. add a pending reorg to be processed for all the subscribers (so they don't add more blocks) - // 2. remove block - for _, sub := range r.subscriptions { - sub.pendingReorgsToBeProcessed.Add(1) + continue } - r.removeTrackedBlock(ctx, unfalisedBlocksID, currentBlock-1) + startBlock := lastBlockFromClient + unfinalisedBlocksSorted := unfinalisedBlocksMap.getSorted() + lastReorgBlock := uint64(0) + + for i := startBlock.Number.Uint64(); i > unfinalisedBlocksSorted[0].Num; i-- { + previousBlock, ok := unfinalisedBlocksMap[i-1] + if !ok || previousBlock.Hash == startBlock.ParentHash { + unfinalisedBlocksMap[i] = block{Num: startBlock.Number.Uint64(), Hash: startBlock.Hash()} + } else if previousBlock.Hash != startBlock.ParentHash { + // reorg happened, we will find out from where exactly and report this to subscribers + lastReorgBlock = i + } + } - currentBlock-- - if firstBlockReorged == 0 { - firstBlockReorged = currentBlock + if lastReorgBlock > 0 { + r.notifyReorgToAllSubscriptions(lastReorgBlock) + } else { + r.updateTrackedBlocks(ctx, unfalisedBlocksID, unfinalisedBlocksMap) } + case <-ctx.Done(): + return } } } -func (r *ReorgDetector) notifyReorg(fromBlock, toBlock uint64) { +func (r *ReorgDetector) notifyReorgToAllSubscriptions(reorgBlock uint64) { for id := range r.subscriptions { - go r.notifyReorgToSubscription(id, fromBlock, toBlock) + r.trackedBlocksLock.RLock() + subscriberBlocks := r.trackedBlocks[id] + r.trackedBlocksLock.RUnlock() + + go r.notifyReorgToSubscription(id, subscriberBlocks.getClosestHigherBlock(reorgBlock).Num) } } -func (r *ReorgDetector) notifyReorgToSubscription(id string, fromBlock, toBlock uint64) { - r.trackedBlocksLock.RLock() - blocks := r.trackedBlocks[id] - r.trackedBlocksLock.RUnlock() +func (r *ReorgDetector) notifyReorgToSubscription(id string, reorgBlock uint64) { + if id == unfalisedBlocksID { + return // unfinalised blocks are not subscribers + } sub := r.subscriptions[id] - var found bool - for i := fromBlock; i <= toBlock; i++ { - if _, ok := blocks[i]; ok { - if !found { - // notify about the first reorged block that was tracked - // and wait for the receiver to process - found = true - sub.FirstReorgedBlock <- i - <-sub.ReorgProcessed - } - delete(blocks, i) - } - } + sub.pendingReorgsToBeProcessed.Add(1) + + // notify about the first reorged block that was tracked + // and wait for the receiver to process + sub.FirstReorgedBlock <- reorgBlock + <-sub.ReorgProcessed sub.pendingReorgsToBeProcessed.Done() } @@ -467,21 +487,6 @@ func (r *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b block return tx.Put(subscriberBlocks, []byte(id), raw) } -// removeTrackedBlock removes the tracked block for a subscriber in db and in memory -func (r *ReorgDetector) removeTrackedBlock(ctx context.Context, id string, blockNum uint64) error { - r.trackedBlocksLock.Lock() - defer r.trackedBlocksLock.Unlock() - - subscriberBlockMap, ok := r.trackedBlocks[id] - if !ok { - return nil - } - - delete(subscriberBlockMap, blockNum) - - return r.updateTrackedBlocksNoLock(ctx, id, subscriberBlockMap) -} - // removeTrackedBlocks removes the tracked blocks for a subscriber in db and in memory func (r *ReorgDetector) removeTrackedBlocks(ctx context.Context, lastFinalizedBlock uint64) error { r.trackedBlocksLock.Lock() @@ -498,6 +503,14 @@ func (r *ReorgDetector) removeTrackedBlocks(ctx context.Context, lastFinalizedBl return nil } +// updateTrackedBlocks updates the tracked blocks for a subscriber in db and in memory +func (r *ReorgDetector) updateTrackedBlocks(ctx context.Context, id string, blocks blockMap) error { + r.trackedBlocksLock.Lock() + defer r.trackedBlocksLock.Unlock() + + return r.updateTrackedBlocksNoLock(ctx, id, blocks) +} + // updateTrackedBlocksNoLock updates the tracked blocks for a subscriber in db and in memory func (r *ReorgDetector) updateTrackedBlocksNoLock(ctx context.Context, id string, blocks blockMap) error { tx, err := r.db.BeginRw(ctx) From 628bc135f48a629d4827fa53a55802c450067b17 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Mon, 22 Jul 2024 10:50:38 +0200 Subject: [PATCH 03/11] fix: small bug and UT --- reorgdetector/reorgdetector.go | 4 +-- reorgdetector/reorgdetector_test.go | 45 +++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index f8ec9c82..8f35e82e 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -134,7 +134,7 @@ func (bm blockMap) removeRange(from, to uint64) { type Subscription struct { FirstReorgedBlock chan uint64 ReorgProcessed chan bool - pendingReorgsToBeProcessed *sync.WaitGroup + pendingReorgsToBeProcessed sync.WaitGroup } type ReorgDetector struct { @@ -266,7 +266,7 @@ func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFi sortedBlocks := blocks.getSorted() lastTrackedBlock = sortedBlocks[len(blocks)-1].Num - for _, block = range blocks { + for _, block = range sortedBlocks { if actualBlockHash, ok = blocksGotten[block.Num]; !ok { actualBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(block.Num))) if err != nil { diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index 77f546e3..05c6ac23 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -84,6 +84,7 @@ func TestBlockMap(t *testing.T) { t.Errorf("getFromBlockSorted() returned incorrect result, expected: %v, got: %v", expectedFromBlockSorted, fromBlockSorted) } } + func TestReorgDetector_New(t *testing.T) { t.Parallel() @@ -192,6 +193,50 @@ func TestReorgDetector_New(t *testing.T) { require.True(t, exists) require.Len(t, testSubscriberMap, 0) // since all blocks are finalized }) + + t.Run("have tracked blocks and subscriptions - reorg happened", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + trackedBlocks := createTestBlocks(t, 1, 5) + testSubscriberBlocks := trackedBlocks[:5] + + insertTestData(t, ctx, db, nil, unfalisedBlocksID) // no unfinalised blocks + insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) + + for _, block := range trackedBlocks[:3] { + client.On("HeaderByNumber", ctx, block.Number).Return( + block, nil, + ) + } + + reorgedBlocks := createTestBlocks(t, 4, 2) // block 4, and 5 are reorged + reorgedBlocks[0].ParentHash = trackedBlocks[2].Hash() // block 4 is reorged but his parent is block 3 + reorgedBlocks[1].ParentHash = reorgedBlocks[0].Hash() // block 5 is reorged but his parent is block 4 + + client.On("HeaderByNumber", ctx, reorgedBlocks[0].Number).Return( + reorgedBlocks[0], nil, + ) + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + reorgedBlocks[len(reorgedBlocks)-1], nil, + ) + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + require.Len(t, rd.trackedBlocks, 2) // testSubscriber and unfinalisedBlocks + + // we wait for the subscriber to be notified about the reorg + firstReorgedBlock := <-rd.subscriptions[testSubscriber].FirstReorgedBlock + require.Equal(t, reorgedBlocks[0].Number.Uint64(), firstReorgedBlock) + + // all blocks should be cleaned from the tracked blocks + // since subscriber had 5 blocks, 3 were finalized, and 2 were reorged but also finalized + subscriberBlocks := rd.trackedBlocks[testSubscriber] + require.Len(t, subscriberBlocks, 0) + }) } func createTestBlocks(t *testing.T, startBlock uint64, count uint64) []*types.Header { From c06b58a961cb84c389d5c1b6d52afde5c8394bbe Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Mon, 22 Jul 2024 12:00:43 +0200 Subject: [PATCH 04/11] feat: AddBlockToTrack UT --- reorgdetector/reorgdetector_test.go | 208 ++++++++++++++++++++++++---- 1 file changed, 184 insertions(+), 24 deletions(-) diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index 05c6ac23..62b6e56b 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -50,6 +50,8 @@ func newTestDB(tb testing.TB) kv.RwDB { } func TestBlockMap(t *testing.T) { + t.Parallel() + // Create a new block map bm := newBlockMap( block{Num: 1, Hash: common.HexToHash("0x123")}, @@ -57,32 +59,87 @@ func TestBlockMap(t *testing.T) { block{Num: 3, Hash: common.HexToHash("0x789")}, ) - // Test getSorted function - sortedBlocks := bm.getSorted() - expectedSortedBlocks := []block{ - {Num: 1, Hash: common.HexToHash("0x123")}, - {Num: 2, Hash: common.HexToHash("0x456")}, - {Num: 3, Hash: common.HexToHash("0x789")}, - } - if !reflect.DeepEqual(sortedBlocks, expectedSortedBlocks) { - t.Errorf("getSorted() returned incorrect result, expected: %v, got: %v", expectedSortedBlocks, sortedBlocks) - } + t.Run("getSorted", func(t *testing.T) { + t.Parallel() - // Test getFromBlockSorted function - fromBlockSorted := bm.getFromBlockSorted(2) - expectedFromBlockSorted := []block{ - {Num: 3, Hash: common.HexToHash("0x789")}, - } - if !reflect.DeepEqual(fromBlockSorted, expectedFromBlockSorted) { - t.Errorf("getFromBlockSorted() returned incorrect result, expected: %v, got: %v", expectedFromBlockSorted, fromBlockSorted) - } + sortedBlocks := bm.getSorted() + expectedSortedBlocks := []block{ + {Num: 1, Hash: common.HexToHash("0x123")}, + {Num: 2, Hash: common.HexToHash("0x456")}, + {Num: 3, Hash: common.HexToHash("0x789")}, + } + if !reflect.DeepEqual(sortedBlocks, expectedSortedBlocks) { + t.Errorf("getSorted() returned incorrect result, expected: %v, got: %v", expectedSortedBlocks, sortedBlocks) + } + }) - // Test getFromBlockSorted function when blockNum is greater than the last block - fromBlockSorted = bm.getFromBlockSorted(4) - expectedFromBlockSorted = []block{} - if !reflect.DeepEqual(fromBlockSorted, expectedFromBlockSorted) { - t.Errorf("getFromBlockSorted() returned incorrect result, expected: %v, got: %v", expectedFromBlockSorted, fromBlockSorted) - } + t.Run("getFromBlockSorted", func(t *testing.T) { + t.Parallel() + + fromBlockSorted := bm.getFromBlockSorted(2) + expectedFromBlockSorted := []block{ + {Num: 3, Hash: common.HexToHash("0x789")}, + } + if !reflect.DeepEqual(fromBlockSorted, expectedFromBlockSorted) { + t.Errorf("getFromBlockSorted() returned incorrect result, expected: %v, got: %v", expectedFromBlockSorted, fromBlockSorted) + } + + // Test getFromBlockSorted function when blockNum is greater than the last block + fromBlockSorted = bm.getFromBlockSorted(4) + expectedFromBlockSorted = []block{} + if !reflect.DeepEqual(fromBlockSorted, expectedFromBlockSorted) { + t.Errorf("getFromBlockSorted() returned incorrect result, expected: %v, got: %v", expectedFromBlockSorted, fromBlockSorted) + } + }) + + t.Run("getClosestHigherBlock", func(t *testing.T) { + t.Parallel() + + bm := newBlockMap( + block{Num: 1, Hash: common.HexToHash("0x123")}, + block{Num: 2, Hash: common.HexToHash("0x456")}, + block{Num: 3, Hash: common.HexToHash("0x789")}, + ) + + // Test when the blockNum exists in the block map + b := bm.getClosestHigherBlock(2) + expectedBlock := block{Num: 2, Hash: common.HexToHash("0x456")} + if b != expectedBlock { + t.Errorf("getClosestHigherBlock() returned incorrect result, expected: %v, got: %v", expectedBlock, b) + } + + // Test when the blockNum does not exist in the block map + b = bm.getClosestHigherBlock(4) + expectedBlock = block{Num: 0, Hash: common.Hash{}} + if b != expectedBlock { + t.Errorf("getClosestHigherBlock() returned incorrect result, expected: %v, got: %v", expectedBlock, b) + } + }) + + t.Run("removeRange", func(t *testing.T) { + t.Parallel() + + bm := newBlockMap( + block{Num: 1, Hash: common.HexToHash("0x123")}, + block{Num: 2, Hash: common.HexToHash("0x456")}, + block{Num: 3, Hash: common.HexToHash("0x789")}, + block{Num: 4, Hash: common.HexToHash("0xabc")}, + block{Num: 5, Hash: common.HexToHash("0xdef")}, + ) + + bm.removeRange(3, 5) + + expectedBlocks := []block{ + {Num: 1, Hash: common.HexToHash("0x123")}, + {Num: 2, Hash: common.HexToHash("0x456")}, + } + + sortedBlocks := bm.getSorted() + + if !reflect.DeepEqual(sortedBlocks, expectedBlocks) { + t.Errorf("removeRange() failed, expected: %v, got: %v", expectedBlocks, sortedBlocks) + } + }) } func TestReorgDetector_New(t *testing.T) { @@ -239,6 +296,109 @@ func TestReorgDetector_New(t *testing.T) { }) } +func TestReorgDetector_AddBlockToTrack(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("no subscription", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(10)}, nil, + ) + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + + err = rd.AddBlockToTrack(ctx, testSubscriber, 1, common.HexToHash("0x123")) + require.ErrorIs(t, err, ErrNotSubscribed) + }) + + t.Run("no unfinalised blocks - block already finalised", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(10)}, nil, + ).Twice() + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + + _, err = rd.Subscribe(testSubscriber) + require.NoError(t, err) + + err = rd.AddBlockToTrack(ctx, testSubscriber, 9, common.HexToHash("0x123")) // block already finalized + require.NoError(t, err) + + subBlocks := rd.trackedBlocks[testSubscriber] + require.Len(t, subBlocks, 0) // since block to track is already finalized no need to track it + }) + + t.Run("no unfinalised blocks - block not finalised", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(10)}, nil, + ).Twice() + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + + _, err = rd.Subscribe(testSubscriber) + require.NoError(t, err) + + err = rd.AddBlockToTrack(ctx, testSubscriber, 11, common.HexToHash("0x123")) // block not finalized + require.NoError(t, err) + + subBlocks := rd.trackedBlocks[testSubscriber] + require.Len(t, subBlocks, 1) + require.Equal(t, subBlocks[11].Hash, common.HexToHash("0x123")) + }) + + t.Run("have unfinalised blocks - block not finalized", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + unfinalisedBlocks := createTestBlocks(t, 11, 5) + insertTestData(t, ctx, db, unfinalisedBlocks, unfalisedBlocksID) + + for _, block := range unfinalisedBlocks { + client.On("HeaderByNumber", ctx, block.Number).Return( + block, nil, + ) + } + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(10)}, nil, + ).Once() + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + + _, err = rd.Subscribe(testSubscriber) + require.NoError(t, err) + + err = rd.AddBlockToTrack(ctx, testSubscriber, 11, unfinalisedBlocks[0].Hash()) // block not finalized + require.NoError(t, err) + + subBlocks := rd.trackedBlocks[testSubscriber] + require.Len(t, subBlocks, 1) + require.Equal(t, subBlocks[11].Hash, unfinalisedBlocks[0].Hash()) + }) +} + func createTestBlocks(t *testing.T, startBlock uint64, count uint64) []*types.Header { t.Helper() From acb1a3e0756a1997b683e6f5252676dc99a231b0 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Mon, 22 Jul 2024 12:39:27 +0200 Subject: [PATCH 05/11] feat: removeTrackedBlocks UT --- reorgdetector/reorgdetector.go | 27 +++++++++----- reorgdetector/reorgdetector_test.go | 55 +++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 8 deletions(-) diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 8f35e82e..59cd7673 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -21,8 +21,8 @@ import ( // the client will have at least as many blocks as it had before the reorg, however this may not be the case for L2 const ( - waitPeriodBlockRemover = time.Second * 20 - waitPeriodBlockAdder = time.Second * 2 // should be smaller than block time of the tracked chain + defaultWaitPeriodBlockRemover = time.Second * 20 + defaultWaitPeriodBlockAdder = time.Second * 2 // should be smaller than block time of the tracked chain subscriberBlocks = "reorgdetector-subscriberBlocks" @@ -94,7 +94,7 @@ func (bm blockMap) getFromBlockSorted(blockNum uint64) []block { newBlocks := make([]block, 0, numOfBlocksToLeave) for i := numOfBlocks - numOfBlocksToLeave; i < numOfBlocks; i++ { - if sortedBlocks[i].Num < lastBlock { + if sortedBlocks[i].Num < blockNum { // skip blocks that are finalised continue } @@ -145,6 +145,9 @@ type ReorgDetector struct { trackedBlocks map[string]blockMap db kv.RwDB + + waitPeriodBlockRemover time.Duration + waitPeriodBlockAdder time.Duration } // New creates a new instance of ReorgDetector @@ -162,10 +165,18 @@ func New(ctx context.Context, client EthClient, dbPath string) (*ReorgDetector, // newReorgDetector creates a new instance of ReorgDetector func newReorgDetector(ctx context.Context, client EthClient, db kv.RwDB) (*ReorgDetector, error) { + return newReorgDetectorWithPeriods(ctx, client, db, defaultWaitPeriodBlockRemover, defaultWaitPeriodBlockAdder) +} + +// newReorgDetectorWithPeriods creates a new instance of ReorgDetector with custom wait periods +func newReorgDetectorWithPeriods(ctx context.Context, client EthClient, db kv.RwDB, + waitPeriodBlockRemover, waitPeriodBlockAdder time.Duration) (*ReorgDetector, error) { r := &ReorgDetector{ - ethClient: client, - db: db, - subscriptions: make(map[string]*Subscription, 0), + ethClient: client, + db: db, + subscriptions: make(map[string]*Subscription, 0), + waitPeriodBlockRemover: waitPeriodBlockRemover, + waitPeriodBlockAdder: waitPeriodBlockAdder, } trackedBlocks, err := r.getTrackedBlocks(ctx) @@ -299,7 +310,7 @@ func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFi } func (r *ReorgDetector) removeFinalisedBlocks(ctx context.Context) { - ticker := time.NewTicker(waitPeriodBlockRemover) + ticker := time.NewTicker(r.waitPeriodBlockRemover) for { select { @@ -325,7 +336,7 @@ func (r *ReorgDetector) removeFinalisedBlocks(ctx context.Context) { func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) { var ( lastUnfinalisedBlock uint64 - ticker = time.NewTicker(waitPeriodBlockAdder) + ticker = time.NewTicker(r.waitPeriodBlockAdder) unfinalisedBlocksMap = r.getUnfinalisedBlocksMap() ) diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index 62b6e56b..ae44b924 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -399,6 +399,61 @@ func TestReorgDetector_AddBlockToTrack(t *testing.T) { }) } +func TestReorgDetector_removeFinalisedBlocks(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + client := NewEthClientMock(t) + db := newTestDB(t) + + unfinalisedBlocks := createTestBlocks(t, 1, 10) + insertTestData(t, ctx, db, unfinalisedBlocks, unfalisedBlocksID) + insertTestData(t, ctx, db, unfinalisedBlocks, testSubscriber) + + // call for removeFinalisedBlocks + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(5)}, nil, + ) + + rd := &ReorgDetector{ + ethClient: client, + db: db, + trackedBlocks: make(map[string]blockMap), + waitPeriodBlockRemover: 100 * time.Millisecond, + waitPeriodBlockAdder: 100 * time.Millisecond, + subscriptions: map[string]*Subscription{ + testSubscriber: { + FirstReorgedBlock: make(chan uint64), + ReorgProcessed: make(chan bool), + }, + unfalisedBlocksID: { + FirstReorgedBlock: make(chan uint64), + ReorgProcessed: make(chan bool), + }, + }, + } + + trackedBlocks, err := rd.getTrackedBlocks(ctx) + require.NoError(t, err) + require.Len(t, trackedBlocks, 2) + + rd.trackedBlocks = trackedBlocks + + // make sure we have all blocks in the tracked blocks before removing finalized blocks + require.Len(t, rd.trackedBlocks[unfalisedBlocksID], len(unfinalisedBlocks)) + require.Len(t, rd.trackedBlocks[testSubscriber], len(unfinalisedBlocks)) + + // remove finalized blocks + go rd.removeFinalisedBlocks(ctx) + + time.Sleep(3 * time.Second) // wait for the go routine to remove the finalized blocks + cancel() + + // make sure all blocks are removed from the tracked blocks + require.Len(t, rd.trackedBlocks[unfalisedBlocksID], 5) + require.Len(t, rd.trackedBlocks[testSubscriber], 5) +} + func createTestBlocks(t *testing.T, startBlock uint64, count uint64) []*types.Header { t.Helper() From 1b345de6b5bc3b385eed22741c848c678041c19c Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Tue, 23 Jul 2024 12:00:41 +0200 Subject: [PATCH 06/11] fix: comments fix --- reorgdetector/reorgdetector.go | 90 ++++++++++++++++------------- reorgdetector/reorgdetector_test.go | 28 ++++----- 2 files changed, 65 insertions(+), 53 deletions(-) diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 59cd7673..4b67aa11 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -26,7 +26,7 @@ const ( subscriberBlocks = "reorgdetector-subscriberBlocks" - unfalisedBlocksID = "unfinalisedBlocks" + unfinalisedBlocksID = "unfinalisedBlocks" ) var ( @@ -82,46 +82,34 @@ func (bm blockMap) getSorted() []block { // getFromBlockSorted returns blocks from blockNum in sorted order without including the blockNum func (bm blockMap) getFromBlockSorted(blockNum uint64) []block { sortedBlocks := bm.getSorted() - numOfBlocks := len(sortedBlocks) - lastBlock := sortedBlocks[numOfBlocks-1].Num - if blockNum < lastBlock { - numOfBlocksToLeave := int(lastBlock - blockNum) - - if numOfBlocksToLeave > numOfBlocks { - numOfBlocksToLeave %= numOfBlocks - } - - newBlocks := make([]block, 0, numOfBlocksToLeave) - for i := numOfBlocks - numOfBlocksToLeave; i < numOfBlocks; i++ { - if sortedBlocks[i].Num < blockNum { - // skip blocks that are finalised - continue - } - - newBlocks = append(newBlocks, sortedBlocks[i]) + index := -1 + for i, b := range sortedBlocks { + if b.Num > blockNum { + index = i + break } + } - sortedBlocks = newBlocks - } else { - sortedBlocks = []block{} + if index == -1 { + return []block{} } - return sortedBlocks + return sortedBlocks[index:] } // getClosestHigherBlock returns the closest higher block to the given blockNum -func (bm blockMap) getClosestHigherBlock(blockNum uint64) block { +func (bm blockMap) getClosestHigherBlock(blockNum uint64) (block, bool) { if block, ok := bm[blockNum]; ok { - return block + return block, true } sorted := bm.getFromBlockSorted(blockNum) if len(sorted) == 0 { - return block{} + return block{}, false } - return sorted[0] + return sorted[0], true } // removeRange removes blocks from from to to @@ -204,7 +192,7 @@ func (r *ReorgDetector) Start(ctx context.Context) { } func (r *ReorgDetector) Subscribe(id string) (*Subscription, error) { - if id == unfalisedBlocksID { + if id == unfinalisedBlocksID { return nil, ErrIDReserverd } @@ -362,7 +350,7 @@ func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) { unfinalisedBlocksMap = r.getUnfinalisedBlocksMap() if len(unfinalisedBlocksMap) == 0 { // no unfinalised blocks, just add this block to the map - if err := r.saveTrackedBlock(ctx, unfalisedBlocksID, block{ + if err := r.saveTrackedBlock(ctx, unfinalisedBlocksID, block{ Num: lastBlockFromClient.Number.Uint64(), Hash: lastBlockFromClient.Hash(), }); err != nil { @@ -374,22 +362,32 @@ func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) { startBlock := lastBlockFromClient unfinalisedBlocksSorted := unfinalisedBlocksMap.getSorted() - lastReorgBlock := uint64(0) + reorgBlock := uint64(0) for i := startBlock.Number.Uint64(); i > unfinalisedBlocksSorted[0].Num; i-- { previousBlock, ok := unfinalisedBlocksMap[i-1] - if !ok || previousBlock.Hash == startBlock.ParentHash { + if !ok { + b, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(i-1))) + if err != nil { + log.Error("reorg detector - error getting previous block", "block", i-1, "err", err) + break // stop processing blocks, and we will try to detect it in the next iteration + } + + previousBlock = block{Num: b.Number.Uint64(), Hash: b.Hash()} + } + + if previousBlock.Hash == startBlock.ParentHash { unfinalisedBlocksMap[i] = block{Num: startBlock.Number.Uint64(), Hash: startBlock.Hash()} } else if previousBlock.Hash != startBlock.ParentHash { // reorg happened, we will find out from where exactly and report this to subscribers - lastReorgBlock = i + reorgBlock = i } } - if lastReorgBlock > 0 { - r.notifyReorgToAllSubscriptions(lastReorgBlock) + if reorgBlock > 0 { + r.notifyReorgToAllSubscriptions(reorgBlock) } else { - r.updateTrackedBlocks(ctx, unfalisedBlocksID, unfinalisedBlocksMap) + r.updateTrackedBlocks(ctx, unfinalisedBlocksID, unfinalisedBlocksMap) } case <-ctx.Done(): return @@ -403,13 +401,25 @@ func (r *ReorgDetector) notifyReorgToAllSubscriptions(reorgBlock uint64) { subscriberBlocks := r.trackedBlocks[id] r.trackedBlocksLock.RUnlock() - go r.notifyReorgToSubscription(id, subscriberBlocks.getClosestHigherBlock(reorgBlock).Num) + closestBlock, exists := subscriberBlocks.getClosestHigherBlock(reorgBlock) + + if exists { + go r.notifyReorgToSubscription(id, closestBlock.Num) + + // remove reorged blocks from tracked blocks + sorted := subscriberBlocks.getSorted() + subscriberBlocks.removeRange(closestBlock.Num, sorted[len(sorted)-1].Num) + if err := r.updateTrackedBlocks(context.Background(), id, subscriberBlocks); err != nil { + log.Error("reorg detector - error updating tracked blocks", "err", err) + } + } } } func (r *ReorgDetector) notifyReorgToSubscription(id string, reorgBlock uint64) { - if id == unfalisedBlocksID { - return // unfinalised blocks are not subscribers + if id == unfinalisedBlocksID { + // unfinalised blocks are not subscribers, and reorg block should be > 0 + return } sub := r.subscriptions[id] @@ -428,7 +438,7 @@ func (r *ReorgDetector) getUnfinalisedBlocksMap() blockMap { r.trackedBlocksLock.RLock() defer r.trackedBlocksLock.RUnlock() - return r.trackedBlocks[unfalisedBlocksID] + return r.trackedBlocks[unfinalisedBlocksID] } // getTrackedBlocks returns a list of tracked blocks for each subscriber from db @@ -462,9 +472,9 @@ func (r *ReorgDetector) getTrackedBlocks(ctx context.Context) (map[string]blockM trackedBlocks[string(k)] = newBlockMap(blocks...) } - if _, ok := trackedBlocks[unfalisedBlocksID]; !ok { + if _, ok := trackedBlocks[unfinalisedBlocksID]; !ok { // add unfinalised blocks to tracked blocks map if not present in db - trackedBlocks[unfalisedBlocksID] = newBlockMap() + trackedBlocks[unfinalisedBlocksID] = newBlockMap() } return trackedBlocks, nil diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index ae44b924..904495bc 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -102,14 +102,16 @@ func TestBlockMap(t *testing.T) { ) // Test when the blockNum exists in the block map - b := bm.getClosestHigherBlock(2) + b, exists := bm.getClosestHigherBlock(2) + require.True(t, exists) expectedBlock := block{Num: 2, Hash: common.HexToHash("0x456")} if b != expectedBlock { t.Errorf("getClosestHigherBlock() returned incorrect result, expected: %v, got: %v", expectedBlock, b) } // Test when the blockNum does not exist in the block map - b = bm.getClosestHigherBlock(4) + b, exists = bm.getClosestHigherBlock(4) + require.False(t, exists) expectedBlock = block{Num: 0, Hash: common.Hash{}} if b != expectedBlock { t.Errorf("getClosestHigherBlock() returned incorrect result, expected: %v, got: %v", expectedBlock, b) @@ -161,7 +163,7 @@ func TestReorgDetector_New(t *testing.T) { require.NoError(t, err) require.Len(t, rd.trackedBlocks, 1) - unfinalisedBlocksMap, exists := rd.trackedBlocks[unfalisedBlocksID] + unfinalisedBlocksMap, exists := rd.trackedBlocks[unfinalisedBlocksID] require.True(t, exists) require.Empty(t, unfinalisedBlocksMap) }) @@ -189,7 +191,7 @@ func TestReorgDetector_New(t *testing.T) { unfinalisedBlocks := testBlocks[:5] testSubscriberBlocks := testBlocks[:3] - insertTestData(t, ctx, db, unfinalisedBlocks, unfalisedBlocksID) + insertTestData(t, ctx, db, unfinalisedBlocks, unfinalisedBlocksID) insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) for _, block := range unfinalisedBlocks { @@ -206,7 +208,7 @@ func TestReorgDetector_New(t *testing.T) { require.NoError(t, err) require.Len(t, rd.trackedBlocks, 2) // testSubscriber and unfinalisedBlocks - unfinalisedBlocksMap, exists := rd.trackedBlocks[unfalisedBlocksID] + unfinalisedBlocksMap, exists := rd.trackedBlocks[unfinalisedBlocksID] require.True(t, exists) require.Len(t, unfinalisedBlocksMap, 0) // since all blocks are finalized @@ -225,7 +227,7 @@ func TestReorgDetector_New(t *testing.T) { unfinalisedBlocks := testBlocks[:6] testSubscriberBlocks := testBlocks[:4] - insertTestData(t, ctx, db, unfinalisedBlocks, unfalisedBlocksID) + insertTestData(t, ctx, db, unfinalisedBlocks, unfinalisedBlocksID) insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) for _, block := range unfinalisedBlocks { @@ -242,7 +244,7 @@ func TestReorgDetector_New(t *testing.T) { require.NoError(t, err) require.Len(t, rd.trackedBlocks, 2) // testSubscriber and unfinalisedBlocks - unfinalisedBlocksMap, exists := rd.trackedBlocks[unfalisedBlocksID] + unfinalisedBlocksMap, exists := rd.trackedBlocks[unfinalisedBlocksID] require.True(t, exists) require.Len(t, unfinalisedBlocksMap, len(unfinalisedBlocks)-len(testSubscriberBlocks)) // since all blocks are finalized @@ -260,7 +262,7 @@ func TestReorgDetector_New(t *testing.T) { trackedBlocks := createTestBlocks(t, 1, 5) testSubscriberBlocks := trackedBlocks[:5] - insertTestData(t, ctx, db, nil, unfalisedBlocksID) // no unfinalised blocks + insertTestData(t, ctx, db, nil, unfinalisedBlocksID) // no unfinalised blocks insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) for _, block := range trackedBlocks[:3] { @@ -372,7 +374,7 @@ func TestReorgDetector_AddBlockToTrack(t *testing.T) { db := newTestDB(t) unfinalisedBlocks := createTestBlocks(t, 11, 5) - insertTestData(t, ctx, db, unfinalisedBlocks, unfalisedBlocksID) + insertTestData(t, ctx, db, unfinalisedBlocks, unfinalisedBlocksID) for _, block := range unfinalisedBlocks { client.On("HeaderByNumber", ctx, block.Number).Return( @@ -407,7 +409,7 @@ func TestReorgDetector_removeFinalisedBlocks(t *testing.T) { db := newTestDB(t) unfinalisedBlocks := createTestBlocks(t, 1, 10) - insertTestData(t, ctx, db, unfinalisedBlocks, unfalisedBlocksID) + insertTestData(t, ctx, db, unfinalisedBlocks, unfinalisedBlocksID) insertTestData(t, ctx, db, unfinalisedBlocks, testSubscriber) // call for removeFinalisedBlocks @@ -426,7 +428,7 @@ func TestReorgDetector_removeFinalisedBlocks(t *testing.T) { FirstReorgedBlock: make(chan uint64), ReorgProcessed: make(chan bool), }, - unfalisedBlocksID: { + unfinalisedBlocksID: { FirstReorgedBlock: make(chan uint64), ReorgProcessed: make(chan bool), }, @@ -440,7 +442,7 @@ func TestReorgDetector_removeFinalisedBlocks(t *testing.T) { rd.trackedBlocks = trackedBlocks // make sure we have all blocks in the tracked blocks before removing finalized blocks - require.Len(t, rd.trackedBlocks[unfalisedBlocksID], len(unfinalisedBlocks)) + require.Len(t, rd.trackedBlocks[unfinalisedBlocksID], len(unfinalisedBlocks)) require.Len(t, rd.trackedBlocks[testSubscriber], len(unfinalisedBlocks)) // remove finalized blocks @@ -450,7 +452,7 @@ func TestReorgDetector_removeFinalisedBlocks(t *testing.T) { cancel() // make sure all blocks are removed from the tracked blocks - require.Len(t, rd.trackedBlocks[unfalisedBlocksID], 5) + require.Len(t, rd.trackedBlocks[unfinalisedBlocksID], 5) require.Len(t, rd.trackedBlocks[testSubscriber], 5) } From a0f76695927049db366916c18a2b1ff9760593ff Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Tue, 23 Jul 2024 16:29:35 +0200 Subject: [PATCH 07/11] fix: comments fix --- reorgdetector/reorgdetector_test.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index 904495bc..d7cf4fbd 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -28,22 +28,16 @@ func newTestDB(tb testing.TB) kv.RwDB { dir := fmt.Sprintf("/tmp/reorgdetector-temp_%v", time.Now().UTC().Format(time.RFC3339Nano)) err := os.Mkdir(dir, 0775) - if err != nil { - tb.Fatal(err) - } + require.NoError(tb, err) db, err := mdbx.NewMDBX(nil). Path(dir). WithTableCfg(tableCfgFunc). Open() - if err != nil { - tb.Fatal(err) - } + require.NoError(tb, err) tb.Cleanup(func() { - if err := os.RemoveAll(dir); err != nil { - tb.Fatal(err) - } + require.NoError(tb, os.RemoveAll(dir)) }) return db From 643978bea05a851755a4893cc7a301fcd1b7c9a9 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Wed, 24 Jul 2024 09:58:33 +0200 Subject: [PATCH 08/11] fix: comment fix --- reorgdetector/reorgdetector.go | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 4b67aa11..77b7b1b8 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -326,6 +326,9 @@ func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) { lastUnfinalisedBlock uint64 ticker = time.NewTicker(r.waitPeriodBlockAdder) unfinalisedBlocksMap = r.getUnfinalisedBlocksMap() + prevBlock *types.Header + lastBlockFromClient *types.Header + err error ) if len(unfinalisedBlocksMap) > 0 { @@ -335,7 +338,7 @@ func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) { for { select { case <-ticker.C: - lastBlockFromClient, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.LatestBlockNumber))) + lastBlockFromClient, err = r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.LatestBlockNumber))) if err != nil { log.Error("reorg detector - error getting last block from client", "err", err) continue @@ -367,27 +370,36 @@ func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) { for i := startBlock.Number.Uint64(); i > unfinalisedBlocksSorted[0].Num; i-- { previousBlock, ok := unfinalisedBlocksMap[i-1] if !ok { - b, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(i-1))) + prevBlock, err = r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(i-1))) if err != nil { log.Error("reorg detector - error getting previous block", "block", i-1, "err", err) break // stop processing blocks, and we will try to detect it in the next iteration } - previousBlock = block{Num: b.Number.Uint64(), Hash: b.Hash()} + previousBlock = block{Num: prevBlock.Number.Uint64(), Hash: prevBlock.Hash()} } - if previousBlock.Hash == startBlock.ParentHash { - unfinalisedBlocksMap[i] = block{Num: startBlock.Number.Uint64(), Hash: startBlock.Hash()} - } else if previousBlock.Hash != startBlock.ParentHash { + if previousBlock.Hash == lastBlockFromClient.ParentHash { + unfinalisedBlocksMap[i] = block{Num: lastBlockFromClient.Number.Uint64(), Hash: lastBlockFromClient.Hash()} + } else if previousBlock.Hash != lastBlockFromClient.ParentHash { // reorg happened, we will find out from where exactly and report this to subscribers reorgBlock = i } + + lastBlockFromClient, err = r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(i-1))) + if err != nil { + log.Error("reorg detector - error getting last block from client", "err", err) + break // stop processing blocks, and we will try to detect it in the next iteration + } } - if reorgBlock > 0 { - r.notifyReorgToAllSubscriptions(reorgBlock) - } else { - r.updateTrackedBlocks(ctx, unfinalisedBlocksID, unfinalisedBlocksMap) + if err == nil { + // if we noticed an error, do not notify or update tracked blocks + if reorgBlock > 0 { + r.notifyReorgToAllSubscriptions(reorgBlock) + } else { + r.updateTrackedBlocks(ctx, unfinalisedBlocksID, unfinalisedBlocksMap) + } } case <-ctx.Done(): return From 8767556792e5bde608af0e17833c4a2cf9c0fb22 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Wed, 24 Jul 2024 12:18:11 +0200 Subject: [PATCH 09/11] fix: comments fix --- reorgdetector/reorgdetector.go | 71 +++++++++++++++++++++++++---- reorgdetector/reorgdetector_test.go | 34 ++++---------- 2 files changed, 70 insertions(+), 35 deletions(-) diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 77b7b1b8..217372d6 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -224,19 +224,72 @@ func (r *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, blockNum return ErrInvalidBlockHash } } else { - // block not found in local storage - lastFinalisedBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) + // if we do not have the block, we will check for reorgs and save the block if we can + block, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(blockNum))) if err != nil { return err } - if lastFinalisedBlock.Number.Uint64() >= blockNum { - // block already finalised, no need to track - return nil - } else { - // ReorgDetector has not added the requested block yet, adding it - return r.saveTrackedBlock(ctx, id, block{Num: blockNum, Hash: blockHash}) + + if r.checkForReorg(ctx, id, block) != nil { + return err } } + + return nil +} + +// checkForReorg checks for reorgs and notifies subscribers if a reorg is detected +// if not, block will be saved to the tracked blocks of given subscriber +func (r *ReorgDetector) checkForReorg(ctx context.Context, id string, currentBlock *types.Header) error { + r.trackedBlocksLock.RLock() + subscriberBlocks := r.trackedBlocks[id] + r.trackedBlocksLock.RUnlock() + + if len(subscriberBlocks) == 0 { + // no blocks to check for reorg, just save the block + return r.saveTrackedBlock(ctx, id, block{Num: currentBlock.Number.Uint64(), Hash: currentBlock.Hash()}) + } + + sortedBlocks := subscriberBlocks.getSorted() + lastTrackedBlock := sortedBlocks[len(subscriberBlocks)-1] + + var ( + reorgBlock = uint64(0) + err error + ) + + startBlock := currentBlock + for i := startBlock.Number.Uint64(); i > lastTrackedBlock.Num; i-- { + previousBlock, ok := subscriberBlocks[i-1] + if !ok { + prevBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(i-1))) + if err != nil { + return err + } + + previousBlock = block{Num: prevBlock.Number.Uint64(), Hash: prevBlock.Hash()} + } + + if previousBlock.Hash != currentBlock.ParentHash { + // reorg happened, we will find out from where exactly and report this to subscribers + reorgBlock = i + } + + currentBlock, err = r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(i-1))) + if err != nil { + return err + } + } + + // if we noticed a reorg, notify subscribers and update tracked blocks + if reorgBlock > 0 { + r.notifyReorgToAllSubscriptions(reorgBlock) + return nil + } + + // no reorg detected, save the block + subscriberBlocks[currentBlock.Number.Uint64()] = block{Num: currentBlock.Number.Uint64(), Hash: currentBlock.Hash()} + return r.updateTrackedBlocks(ctx, unfinalisedBlocksID, subscriberBlocks) } func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFinalisedBlock uint64) error { @@ -381,7 +434,7 @@ func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) { if previousBlock.Hash == lastBlockFromClient.ParentHash { unfinalisedBlocksMap[i] = block{Num: lastBlockFromClient.Number.Uint64(), Hash: lastBlockFromClient.Hash()} - } else if previousBlock.Hash != lastBlockFromClient.ParentHash { + } else { // reorg happened, we will find out from where exactly and report this to subscribers reorgBlock = i } diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index d7cf4fbd..49c2e7af 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -314,7 +314,7 @@ func TestReorgDetector_AddBlockToTrack(t *testing.T) { require.ErrorIs(t, err, ErrNotSubscribed) }) - t.Run("no unfinalised blocks - block already finalised", func(t *testing.T) { + t.Run("no unfinalised blocks - no tracked blocks", func(t *testing.T) { t.Parallel() client := NewEthClientMock(t) @@ -322,30 +322,12 @@ func TestReorgDetector_AddBlockToTrack(t *testing.T) { client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( &types.Header{Number: big.NewInt(10)}, nil, - ).Twice() - - rd, err := newReorgDetector(ctx, client, db) - require.NoError(t, err) - - _, err = rd.Subscribe(testSubscriber) - require.NoError(t, err) - - err = rd.AddBlockToTrack(ctx, testSubscriber, 9, common.HexToHash("0x123")) // block already finalized - require.NoError(t, err) - - subBlocks := rd.trackedBlocks[testSubscriber] - require.Len(t, subBlocks, 0) // since block to track is already finalized no need to track it - }) - - t.Run("no unfinalised blocks - block not finalised", func(t *testing.T) { - t.Parallel() - - client := NewEthClientMock(t) - db := newTestDB(t) + ).Once() - client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( - &types.Header{Number: big.NewInt(10)}, nil, - ).Twice() + trackedBlock := &types.Header{Number: big.NewInt(11)} + client.On("HeaderByNumber", ctx, big.NewInt(11)).Return( + trackedBlock, nil, + ).Once() rd, err := newReorgDetector(ctx, client, db) require.NoError(t, err) @@ -353,12 +335,12 @@ func TestReorgDetector_AddBlockToTrack(t *testing.T) { _, err = rd.Subscribe(testSubscriber) require.NoError(t, err) - err = rd.AddBlockToTrack(ctx, testSubscriber, 11, common.HexToHash("0x123")) // block not finalized + err = rd.AddBlockToTrack(ctx, testSubscriber, trackedBlock.Number.Uint64(), trackedBlock.Hash()) // block not finalized require.NoError(t, err) subBlocks := rd.trackedBlocks[testSubscriber] require.Len(t, subBlocks, 1) - require.Equal(t, subBlocks[11].Hash, common.HexToHash("0x123")) + require.Equal(t, subBlocks[11].Hash, trackedBlock.Hash()) }) t.Run("have unfinalised blocks - block not finalized", func(t *testing.T) { From cebfd04a18e8a63f41ebe65e06dab691e01e2520 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Wed, 24 Jul 2024 13:14:53 +0200 Subject: [PATCH 10/11] Revert "fix: comments fix" This reverts commit 8767556792e5bde608af0e17833c4a2cf9c0fb22. --- reorgdetector/reorgdetector.go | 69 +++-------------------------- reorgdetector/reorgdetector_test.go | 34 ++++++++++---- 2 files changed, 32 insertions(+), 71 deletions(-) diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 217372d6..5761d850 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -224,72 +224,15 @@ func (r *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, blockNum return ErrInvalidBlockHash } } else { - // if we do not have the block, we will check for reorgs and save the block if we can - block, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(blockNum))) - if err != nil { - return err - } - - if r.checkForReorg(ctx, id, block) != nil { - return err - } - } - - return nil -} - -// checkForReorg checks for reorgs and notifies subscribers if a reorg is detected -// if not, block will be saved to the tracked blocks of given subscriber -func (r *ReorgDetector) checkForReorg(ctx context.Context, id string, currentBlock *types.Header) error { - r.trackedBlocksLock.RLock() - subscriberBlocks := r.trackedBlocks[id] - r.trackedBlocksLock.RUnlock() - - if len(subscriberBlocks) == 0 { - // no blocks to check for reorg, just save the block - return r.saveTrackedBlock(ctx, id, block{Num: currentBlock.Number.Uint64(), Hash: currentBlock.Hash()}) - } - - sortedBlocks := subscriberBlocks.getSorted() - lastTrackedBlock := sortedBlocks[len(subscriberBlocks)-1] - - var ( - reorgBlock = uint64(0) - err error - ) - - startBlock := currentBlock - for i := startBlock.Number.Uint64(); i > lastTrackedBlock.Num; i-- { - previousBlock, ok := subscriberBlocks[i-1] - if !ok { - prevBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(i-1))) - if err != nil { - return err - } - - previousBlock = block{Num: prevBlock.Number.Uint64(), Hash: prevBlock.Hash()} - } - - if previousBlock.Hash != currentBlock.ParentHash { - // reorg happened, we will find out from where exactly and report this to subscribers - reorgBlock = i - } - - currentBlock, err = r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(i-1))) - if err != nil { + // ReorgDetector has not added the requested block yet, + // so we add it to the unfinalised blocks and then to the subscriber blocks as well + block := block{Num: blockNum, Hash: blockHash} + if err := r.saveTrackedBlock(ctx, unfinalisedBlocksID, block); err != nil { return err } - } - // if we noticed a reorg, notify subscribers and update tracked blocks - if reorgBlock > 0 { - r.notifyReorgToAllSubscriptions(reorgBlock) - return nil + return r.saveTrackedBlock(ctx, id, block) } - - // no reorg detected, save the block - subscriberBlocks[currentBlock.Number.Uint64()] = block{Num: currentBlock.Number.Uint64(), Hash: currentBlock.Hash()} - return r.updateTrackedBlocks(ctx, unfinalisedBlocksID, subscriberBlocks) } func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFinalisedBlock uint64) error { @@ -434,7 +377,7 @@ func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) { if previousBlock.Hash == lastBlockFromClient.ParentHash { unfinalisedBlocksMap[i] = block{Num: lastBlockFromClient.Number.Uint64(), Hash: lastBlockFromClient.Hash()} - } else { + } else if previousBlock.Hash != lastBlockFromClient.ParentHash { // reorg happened, we will find out from where exactly and report this to subscribers reorgBlock = i } diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index 49c2e7af..d7cf4fbd 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -314,7 +314,7 @@ func TestReorgDetector_AddBlockToTrack(t *testing.T) { require.ErrorIs(t, err, ErrNotSubscribed) }) - t.Run("no unfinalised blocks - no tracked blocks", func(t *testing.T) { + t.Run("no unfinalised blocks - block already finalised", func(t *testing.T) { t.Parallel() client := NewEthClientMock(t) @@ -322,12 +322,30 @@ func TestReorgDetector_AddBlockToTrack(t *testing.T) { client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( &types.Header{Number: big.NewInt(10)}, nil, - ).Once() + ).Twice() - trackedBlock := &types.Header{Number: big.NewInt(11)} - client.On("HeaderByNumber", ctx, big.NewInt(11)).Return( - trackedBlock, nil, - ).Once() + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + + _, err = rd.Subscribe(testSubscriber) + require.NoError(t, err) + + err = rd.AddBlockToTrack(ctx, testSubscriber, 9, common.HexToHash("0x123")) // block already finalized + require.NoError(t, err) + + subBlocks := rd.trackedBlocks[testSubscriber] + require.Len(t, subBlocks, 0) // since block to track is already finalized no need to track it + }) + + t.Run("no unfinalised blocks - block not finalised", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(10)}, nil, + ).Twice() rd, err := newReorgDetector(ctx, client, db) require.NoError(t, err) @@ -335,12 +353,12 @@ func TestReorgDetector_AddBlockToTrack(t *testing.T) { _, err = rd.Subscribe(testSubscriber) require.NoError(t, err) - err = rd.AddBlockToTrack(ctx, testSubscriber, trackedBlock.Number.Uint64(), trackedBlock.Hash()) // block not finalized + err = rd.AddBlockToTrack(ctx, testSubscriber, 11, common.HexToHash("0x123")) // block not finalized require.NoError(t, err) subBlocks := rd.trackedBlocks[testSubscriber] require.Len(t, subBlocks, 1) - require.Equal(t, subBlocks[11].Hash, trackedBlock.Hash()) + require.Equal(t, subBlocks[11].Hash, common.HexToHash("0x123")) }) t.Run("have unfinalised blocks - block not finalized", func(t *testing.T) { From b8e0804c8f2446041f8be7cd76379637777162c2 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Wed, 24 Jul 2024 13:21:48 +0200 Subject: [PATCH 11/11] fix: ut fix --- reorgdetector/reorgdetector_test.go | 25 +------------------------ 1 file changed, 1 insertion(+), 24 deletions(-) diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index d7cf4fbd..55fc9e3d 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -314,29 +314,6 @@ func TestReorgDetector_AddBlockToTrack(t *testing.T) { require.ErrorIs(t, err, ErrNotSubscribed) }) - t.Run("no unfinalised blocks - block already finalised", func(t *testing.T) { - t.Parallel() - - client := NewEthClientMock(t) - db := newTestDB(t) - - client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( - &types.Header{Number: big.NewInt(10)}, nil, - ).Twice() - - rd, err := newReorgDetector(ctx, client, db) - require.NoError(t, err) - - _, err = rd.Subscribe(testSubscriber) - require.NoError(t, err) - - err = rd.AddBlockToTrack(ctx, testSubscriber, 9, common.HexToHash("0x123")) // block already finalized - require.NoError(t, err) - - subBlocks := rd.trackedBlocks[testSubscriber] - require.Len(t, subBlocks, 0) // since block to track is already finalized no need to track it - }) - t.Run("no unfinalised blocks - block not finalised", func(t *testing.T) { t.Parallel() @@ -345,7 +322,7 @@ func TestReorgDetector_AddBlockToTrack(t *testing.T) { client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( &types.Header{Number: big.NewInt(10)}, nil, - ).Twice() + ).Once() rd, err := newReorgDetector(ctx, client, db) require.NoError(t, err)