diff --git a/common/common.go b/common/common.go new file mode 100644 index 00000000..e211a335 --- /dev/null +++ b/common/common.go @@ -0,0 +1,16 @@ +package common + +import "encoding/binary" + +// BlockNum2Bytes converts a block number to a byte slice +func BlockNum2Bytes(blockNum uint64) []byte { + key := make([]byte, 8) + binary.LittleEndian.PutUint64(key, blockNum) + + return key +} + +// Bytes2BlockNum converts a byte slice to a block number +func Bytes2BlockNum(key []byte) uint64 { + return binary.LittleEndian.Uint64(key) +} diff --git a/localbridgesync/downloader_test.go b/localbridgesync/downloader_test.go index c60f4d3f..553efbec 100644 --- a/localbridgesync/downloader_test.go +++ b/localbridgesync/downloader_test.go @@ -9,6 +9,7 @@ import ( "github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonzkevmbridge" "github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonzkevmbridgev2" + cdkcommon "github.com/0xPolygon/cdk/common" "github.com/0xPolygon/cdk/etherman" "github.com/0xPolygon/cdk/log" "github.com/ethereum/go-ethereum" @@ -203,7 +204,7 @@ func generateBridge(t *testing.T, blockNum uint32) (*types.Log, Bridge) { log := &types.Log{ Address: contractAddr, BlockNumber: uint64(blockNum), - BlockHash: common.BytesToHash(blockNum2Bytes(uint64(blockNum))), + BlockHash: common.BytesToHash(cdkcommon.BlockNum2Bytes(uint64(blockNum))), Topics: []common.Hash{bridgeEventSignature}, Data: data, } @@ -262,7 +263,7 @@ func generateClaim(t *testing.T, blockNum uint32, event *abi.Event, isV1 bool) ( log := &types.Log{ Address: contractAddr, BlockNumber: uint64(blockNum), - BlockHash: common.BytesToHash(blockNum2Bytes(uint64(blockNum))), + BlockHash: common.BytesToHash(cdkcommon.BlockNum2Bytes(uint64(blockNum))), Topics: []common.Hash{signature}, Data: data, } diff --git a/localbridgesync/processor.go b/localbridgesync/processor.go index 2e6f27a0..69c885b8 100644 --- a/localbridgesync/processor.go +++ b/localbridgesync/processor.go @@ -2,10 +2,10 @@ package localbridgesync import ( "context" - "encoding/binary" "encoding/json" "errors" + "github.com/0xPolygon/cdk/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" ) @@ -57,6 +57,9 @@ func (p *processor) GetClaimsAndBridges( } defer tx.Rollback() lpb, err := p.getLastProcessedBlockWithTx(tx) + if err != nil { + return nil, err + } if lpb < toBlock { return nil, ErrBlockNotProcessed } @@ -66,11 +69,11 @@ func (p *processor) GetClaimsAndBridges( } defer c.Close() - for k, v, err := c.Seek(blockNum2Bytes(fromBlock)); k != nil; k, v, err = c.Next() { + for k, v, err := c.Seek(common.BlockNum2Bytes(fromBlock)); k != nil; k, v, err = c.Next() { if err != nil { return nil, err } - if bytes2BlockNum(k) > toBlock { + if common.Bytes2BlockNum(k) > toBlock { break } blockEvents := []BridgeEvent{} @@ -99,7 +102,7 @@ func (p *processor) getLastProcessedBlockWithTx(tx kv.Tx) (uint64, error) { } else if blockNumBytes == nil { return 0, nil } else { - return bytes2BlockNum(blockNumBytes), nil + return common.Bytes2BlockNum(blockNumBytes), nil } } @@ -113,7 +116,7 @@ func (p *processor) reorg(firstReorgedBlock uint64) error { return err } defer c.Close() - firstKey := blockNum2Bytes(firstReorgedBlock) + firstKey := common.BlockNum2Bytes(firstReorgedBlock) for k, _, err := c.Seek(firstKey); k != nil; k, _, err = c.Next() { if err != nil { tx.Rollback() @@ -142,7 +145,7 @@ func (p *processor) storeBridgeEvents(blockNum uint64, events []BridgeEvent) err tx.Rollback() return err } - if err := tx.Put(eventsTable, blockNum2Bytes(blockNum), value); err != nil { + if err := tx.Put(eventsTable, common.BlockNum2Bytes(blockNum), value); err != nil { tx.Rollback() return err } @@ -155,16 +158,6 @@ func (p *processor) storeBridgeEvents(blockNum uint64, events []BridgeEvent) err } func (p *processor) updateLastProcessedBlock(tx kv.RwTx, blockNum uint64) error { - blockNumBytes := blockNum2Bytes(blockNum) + blockNumBytes := common.BlockNum2Bytes(blockNum) return tx.Put(lastBlockTable, lastBlokcKey, blockNumBytes) } - -func blockNum2Bytes(blockNum uint64) []byte { - key := make([]byte, 8) - binary.LittleEndian.PutUint64(key, blockNum) - return key -} - -func bytes2BlockNum(key []byte) uint64 { - return binary.LittleEndian.Uint64(key) -} diff --git a/reorgdetector/mock_eth_client.go b/reorgdetector/mock_eth_client.go new file mode 100644 index 00000000..add883f6 --- /dev/null +++ b/reorgdetector/mock_eth_client.go @@ -0,0 +1,89 @@ +// Code generated by mockery v2.42.1. DO NOT EDIT. + +package reorgdetector + +import ( + context "context" + big "math/big" + + mock "github.com/stretchr/testify/mock" + + types "github.com/ethereum/go-ethereum/core/types" +) + +// EthClientMock is an autogenerated mock type for the EthClient type +type EthClientMock struct { + mock.Mock +} + +// BlockNumber provides a mock function with given fields: ctx +func (_m *EthClientMock) BlockNumber(ctx context.Context) (uint64, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for BlockNumber") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) uint64); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// HeaderByNumber provides a mock function with given fields: ctx, number +func (_m *EthClientMock) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + ret := _m.Called(ctx, number) + + if len(ret) == 0 { + panic("no return value specified for HeaderByNumber") + } + + var r0 *types.Header + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Header, error)); ok { + return rf(ctx, number) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *types.Header); ok { + r0 = rf(ctx, number) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Header) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, number) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewEthClientMock creates a new instance of EthClientMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewEthClientMock(t interface { + mock.TestingT + Cleanup(func()) +}) *EthClientMock { + mock := &EthClientMock{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 06d15e85..5761d850 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -2,78 +2,210 @@ package reorgdetector import ( "context" + "encoding/json" "errors" "math/big" + "sort" "sync" "time" + "github.com/0xPolygon/cdk/log" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" ) // TODO: consider the case where blocks can disappear, current implementation assumes that if there is a reorg, // the client will have at least as many blocks as it had before the reorg, however this may not be the case for L2 const ( - waitPeriodBlockRemover = time.Second * 20 - waitPeriodBlockAdder = time.Second * 2 // should be smaller than block time of the tracked chain + defaultWaitPeriodBlockRemover = time.Second * 20 + defaultWaitPeriodBlockAdder = time.Second * 2 // should be smaller than block time of the tracked chain + + subscriberBlocks = "reorgdetector-subscriberBlocks" + + unfinalisedBlocksID = "unfinalisedBlocks" ) var ( ErrNotSubscribed = errors.New("id not found in subscriptions") ErrInvalidBlockHash = errors.New("the block hash does not match with the expected block hash") + ErrIDReserverd = errors.New("subscription id is reserved") ) +func tableCfgFunc(defaultBuckets kv.TableCfg) kv.TableCfg { + return kv.TableCfg{ + subscriberBlocks: {}, + } +} + +type EthClient interface { + HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + BlockNumber(ctx context.Context) (uint64, error) +} + +type block struct { + Num uint64 + Hash common.Hash +} + +type blockMap map[uint64]block + +// newBlockMap returns a new instance of blockMap +func newBlockMap(blocks ...block) blockMap { + blockMap := make(blockMap, len(blocks)) + + for _, b := range blocks { + blockMap[b.Num] = b + } + + return blockMap +} + +// getSorted returns blocks in sorted order +func (bm blockMap) getSorted() []block { + sortedBlocks := make([]block, 0, len(bm)) + + for _, b := range bm { + sortedBlocks = append(sortedBlocks, b) + } + + sort.Slice(sortedBlocks, func(i, j int) bool { + return sortedBlocks[i].Num < sortedBlocks[j].Num + }) + + return sortedBlocks +} + +// getFromBlockSorted returns blocks from blockNum in sorted order without including the blockNum +func (bm blockMap) getFromBlockSorted(blockNum uint64) []block { + sortedBlocks := bm.getSorted() + + index := -1 + for i, b := range sortedBlocks { + if b.Num > blockNum { + index = i + break + } + } + + if index == -1 { + return []block{} + } + + return sortedBlocks[index:] +} + +// getClosestHigherBlock returns the closest higher block to the given blockNum +func (bm blockMap) getClosestHigherBlock(blockNum uint64) (block, bool) { + if block, ok := bm[blockNum]; ok { + return block, true + } + + sorted := bm.getFromBlockSorted(blockNum) + if len(sorted) == 0 { + return block{}, false + } + + return sorted[0], true +} + +// removeRange removes blocks from from to to +func (bm blockMap) removeRange(from, to uint64) { + for i := from; i <= to; i++ { + delete(bm, i) + } +} + type Subscription struct { FirstReorgedBlock chan uint64 ReorgProcessed chan bool - mu sync.Mutex - pendingReorgsToBeProcessed *sync.WaitGroup + pendingReorgsToBeProcessed sync.WaitGroup } type ReorgDetector struct { - ethClient *ethclient.Client - mu sync.Mutex - unfinalisedBlocks map[uint64]common.Hash - trackedBlocks map[string]map[uint64]common.Hash // TODO: needs persistance! needs to be able to iterate in order! - // the channel is used to notify first invalid block + ethClient EthClient subscriptions map[string]*Subscription + + trackedBlocksLock sync.RWMutex + trackedBlocks map[string]blockMap + + db kv.RwDB + + waitPeriodBlockRemover time.Duration + waitPeriodBlockAdder time.Duration } -func New(ctx context.Context) (*ReorgDetector, error) { - r := &ReorgDetector{} - lastFinalisedBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) +// New creates a new instance of ReorgDetector +func New(ctx context.Context, client EthClient, dbPath string) (*ReorgDetector, error) { + db, err := mdbx.NewMDBX(nil). + Path(dbPath). + WithTableCfg(tableCfgFunc). + Open() if err != nil { return nil, err } - r.unfinalisedBlocks[lastFinalisedBlock.Number.Uint64()] = lastFinalisedBlock.Hash() - err = r.cleanStoredSubsBeforeStart(ctx, lastFinalisedBlock.Number.Uint64()) + + return newReorgDetector(ctx, client, db) +} + +// newReorgDetector creates a new instance of ReorgDetector +func newReorgDetector(ctx context.Context, client EthClient, db kv.RwDB) (*ReorgDetector, error) { + return newReorgDetectorWithPeriods(ctx, client, db, defaultWaitPeriodBlockRemover, defaultWaitPeriodBlockAdder) +} + +// newReorgDetectorWithPeriods creates a new instance of ReorgDetector with custom wait periods +func newReorgDetectorWithPeriods(ctx context.Context, client EthClient, db kv.RwDB, + waitPeriodBlockRemover, waitPeriodBlockAdder time.Duration) (*ReorgDetector, error) { + r := &ReorgDetector{ + ethClient: client, + db: db, + subscriptions: make(map[string]*Subscription, 0), + waitPeriodBlockRemover: waitPeriodBlockRemover, + waitPeriodBlockAdder: waitPeriodBlockAdder, + } + + trackedBlocks, err := r.getTrackedBlocks(ctx) if err != nil { return nil, err } + + r.trackedBlocks = trackedBlocks + + lastFinalisedBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) + if err != nil { + return nil, err + } + + if err = r.cleanStoredSubsBeforeStart(ctx, lastFinalisedBlock.Number.Uint64()); err != nil { + return nil, err + } + return r, nil } func (r *ReorgDetector) Start(ctx context.Context) { - var lastFinalisedBlock uint64 - for lastFinalisedBlock = range r.unfinalisedBlocks { - } go r.removeFinalisedBlocks(ctx) - go r.addUnfinalisedBlocks(ctx, lastFinalisedBlock+1) + go r.addUnfinalisedBlocks(ctx) } -func (r *ReorgDetector) Subscribe(id string) *Subscription { +func (r *ReorgDetector) Subscribe(id string) (*Subscription, error) { + if id == unfinalisedBlocksID { + return nil, ErrIDReserverd + } + if sub, ok := r.subscriptions[id]; ok { - return sub + return sub, nil } sub := &Subscription{ FirstReorgedBlock: make(chan uint64), ReorgProcessed: make(chan bool), } r.subscriptions[id] = sub - r.trackedBlocks[id] = make(map[uint64]common.Hash) - return sub + + return sub, nil } func (r *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, blockNum uint64, blockHash common.Hash) error { @@ -82,165 +214,351 @@ func (r *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, blockNum } else { // In case there are reorgs being processed, wait // Note that this also makes any addition to trackedBlocks[id] safe - sub.mu.Lock() - defer sub.mu.Unlock() sub.pendingReorgsToBeProcessed.Wait() } - if actualHash, ok := r.unfinalisedBlocks[blockNum]; ok { - if actualHash == blockHash { - r.trackedBlocks[id][blockNum] = blockHash - return nil + + if actualHash, ok := r.getUnfinalisedBlocksMap()[blockNum]; ok { + if actualHash.Hash == blockHash { + return r.saveTrackedBlock(ctx, id, block{Num: blockNum, Hash: blockHash}) } else { return ErrInvalidBlockHash } } else { - // block not found in local storage - lastFinalisedBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) - if err != nil { + // ReorgDetector has not added the requested block yet, + // so we add it to the unfinalised blocks and then to the subscriber blocks as well + block := block{Num: blockNum, Hash: blockHash} + if err := r.saveTrackedBlock(ctx, unfinalisedBlocksID, block); err != nil { return err } - if lastFinalisedBlock.Number.Uint64() >= blockNum { - // block already finalised, no need to track - return nil - } else { - // ReorgDetector has not added the requested block yet, adding it - r.trackedBlocks[id][blockNum] = blockHash - return nil - } + + return r.saveTrackedBlock(ctx, id, block) } } func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFinalisedBlock uint64) error { - for id := range r.trackedBlocks { - sub := &Subscription{ + blocksGotten := make(map[uint64]common.Hash, 0) + + r.trackedBlocksLock.Lock() + defer r.trackedBlocksLock.Unlock() + + for id, blocks := range r.trackedBlocks { + r.subscriptions[id] = &Subscription{ FirstReorgedBlock: make(chan uint64), ReorgProcessed: make(chan bool), } - r.subscriptions[id] = sub - if err := r.cleanStoredSubBeforeStart(ctx, id, latestFinalisedBlock); err != nil { - return err + + var ( + lastTrackedBlock uint64 + block block + actualBlockHash common.Hash + ok bool + ) + + if len(blocks) == 0 { + continue // nothing to process for this subscriber } - } - return nil -} -func (r *ReorgDetector) cleanStoredSubBeforeStart(ctx context.Context, id string, latestFinalisedBlock uint64) error { - blocks := r.trackedBlocks[id] - var lastTrackedBlock uint64 // TODO: get the greatest block num tracked - for expectedBlockNum, expectedBlockHash := range blocks { // should iterate in order - actualBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(expectedBlockNum))) - if err != nil { - return err + sortedBlocks := blocks.getSorted() + lastTrackedBlock = sortedBlocks[len(blocks)-1].Num + + for _, block = range sortedBlocks { + if actualBlockHash, ok = blocksGotten[block.Num]; !ok { + actualBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(block.Num))) + if err != nil { + return err + } + + actualBlockHash = actualBlock.Hash() + } + + if actualBlockHash != block.Hash { + // reorg detected, notify subscriber + go r.notifyReorgToSubscription(id, block.Num) + + // remove the reorged blocks from the tracked blocks + blocks.removeRange(block.Num, lastTrackedBlock) + + break + } else if block.Num <= latestFinalisedBlock { + delete(blocks, block.Num) + } } - if actualBlock.Hash() != expectedBlockHash { - r.subscriptions[id].pendingReorgsToBeProcessed.Add(1) - go r.notifyReorgToSubscription(id, expectedBlockNum, lastTrackedBlock) - return nil - } else if expectedBlockNum < latestFinalisedBlock { - delete(blocks, expectedBlockNum) + + // if we processed finalized or reorged blocks, update the tracked blocks in memory and db + if err := r.updateTrackedBlocksNoLock(ctx, id, blocks); err != nil { + return err } } + return nil } func (r *ReorgDetector) removeFinalisedBlocks(ctx context.Context) { + ticker := time.NewTicker(r.waitPeriodBlockRemover) + for { - lastFinalisedBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) - if err != nil { - // TODO: handle error - return - } - for i := lastFinalisedBlock.Number.Uint64(); i >= 0; i-- { - if _, ok := r.unfinalisedBlocks[i]; ok { - r.mu.Lock() - delete(r.unfinalisedBlocks, i) - r.mu.Unlock() - } else { - break + select { + case <-ticker.C: + lastFinalisedBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) + if err != nil { + log.Error("reorg detector - error getting last finalised block", "err", err) + + continue } - for id, blocks := range r.trackedBlocks { - r.subscriptions[id].mu.Lock() - delete(blocks, i) - r.subscriptions[id].mu.Unlock() + + if err := r.removeTrackedBlocks(ctx, lastFinalisedBlock.Number.Uint64()); err != nil { + log.Error("reorg detector - error removing tracked blocks", "err", err) + + continue } + case <-ctx.Done(): + return } - time.Sleep(waitPeriodBlockRemover) } } -func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context, initBlock uint64) { +func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) { var ( - firstBlockReorged uint64 - err error + lastUnfinalisedBlock uint64 + ticker = time.NewTicker(r.waitPeriodBlockAdder) + unfinalisedBlocksMap = r.getUnfinalisedBlocksMap() + prevBlock *types.Header + lastBlockFromClient *types.Header + err error ) - currentBlock := initBlock - lastBlockFromClient := currentBlock - 1 + + if len(unfinalisedBlocksMap) > 0 { + lastUnfinalisedBlock = unfinalisedBlocksMap.getSorted()[len(unfinalisedBlocksMap)-1].Num + } + for { - for currentBlock > lastBlockFromClient { - lastBlockFromClient, err = r.ethClient.BlockNumber(ctx) + select { + case <-ticker.C: + lastBlockFromClient, err = r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.LatestBlockNumber))) if err != nil { - // TODO: handle error - return + log.Error("reorg detector - error getting last block from client", "err", err) + continue } - time.Sleep(waitPeriodBlockAdder) - } - block, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(currentBlock))) - if err != nil { - // TODO: handle error - return - } - prevBlockHash, ok := r.unfinalisedBlocks[currentBlock-1] - if !ok || block.ParentHash == prevBlockHash { - // previous block is correct or there is no previous block, add current block - r.mu.Lock() - r.unfinalisedBlocks[currentBlock] = block.Hash() - r.mu.Unlock() - if firstBlockReorged > 0 { - r.notifyReorg(currentBlock, firstBlockReorged) + + if lastBlockFromClient.Number.Uint64() < lastUnfinalisedBlock { + // a reorg probably happened, and the client has less blocks than we have + // we should wait for the client to catch up so we can be sure + continue } - currentBlock++ - firstBlockReorged = 0 - } else { - // previous block is reorged: - // 1. add a pending reorg to be processed for all the subscribers (so they don't add more blocks) - // 2. remove block - for _, sub := range r.subscriptions { - sub.mu.Lock() - sub.pendingReorgsToBeProcessed.Add(1) - sub.mu.Unlock() + + unfinalisedBlocksMap = r.getUnfinalisedBlocksMap() + if len(unfinalisedBlocksMap) == 0 { + // no unfinalised blocks, just add this block to the map + if err := r.saveTrackedBlock(ctx, unfinalisedBlocksID, block{ + Num: lastBlockFromClient.Number.Uint64(), + Hash: lastBlockFromClient.Hash(), + }); err != nil { + log.Error("reorg detector - error saving unfinalised block", "block", lastBlockFromClient.Number.Uint64(), "err", err) + } + + continue + } + + startBlock := lastBlockFromClient + unfinalisedBlocksSorted := unfinalisedBlocksMap.getSorted() + reorgBlock := uint64(0) + + for i := startBlock.Number.Uint64(); i > unfinalisedBlocksSorted[0].Num; i-- { + previousBlock, ok := unfinalisedBlocksMap[i-1] + if !ok { + prevBlock, err = r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(i-1))) + if err != nil { + log.Error("reorg detector - error getting previous block", "block", i-1, "err", err) + break // stop processing blocks, and we will try to detect it in the next iteration + } + + previousBlock = block{Num: prevBlock.Number.Uint64(), Hash: prevBlock.Hash()} + } + + if previousBlock.Hash == lastBlockFromClient.ParentHash { + unfinalisedBlocksMap[i] = block{Num: lastBlockFromClient.Number.Uint64(), Hash: lastBlockFromClient.Hash()} + } else if previousBlock.Hash != lastBlockFromClient.ParentHash { + // reorg happened, we will find out from where exactly and report this to subscribers + reorgBlock = i + } + + lastBlockFromClient, err = r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(i-1))) + if err != nil { + log.Error("reorg detector - error getting last block from client", "err", err) + break // stop processing blocks, and we will try to detect it in the next iteration + } } - r.mu.Lock() - delete(r.unfinalisedBlocks, currentBlock-1) - r.mu.Unlock() - currentBlock-- - if firstBlockReorged == 0 { - firstBlockReorged = currentBlock + + if err == nil { + // if we noticed an error, do not notify or update tracked blocks + if reorgBlock > 0 { + r.notifyReorgToAllSubscriptions(reorgBlock) + } else { + r.updateTrackedBlocks(ctx, unfinalisedBlocksID, unfinalisedBlocksMap) + } } + case <-ctx.Done(): + return } } } -func (r *ReorgDetector) notifyReorg(fromBlock, toBlock uint64) { +func (r *ReorgDetector) notifyReorgToAllSubscriptions(reorgBlock uint64) { for id := range r.subscriptions { - go r.notifyReorgToSubscription(id, fromBlock, toBlock) + r.trackedBlocksLock.RLock() + subscriberBlocks := r.trackedBlocks[id] + r.trackedBlocksLock.RUnlock() + + closestBlock, exists := subscriberBlocks.getClosestHigherBlock(reorgBlock) + + if exists { + go r.notifyReorgToSubscription(id, closestBlock.Num) + + // remove reorged blocks from tracked blocks + sorted := subscriberBlocks.getSorted() + subscriberBlocks.removeRange(closestBlock.Num, sorted[len(sorted)-1].Num) + if err := r.updateTrackedBlocks(context.Background(), id, subscriberBlocks); err != nil { + log.Error("reorg detector - error updating tracked blocks", "err", err) + } + } } } -func (r *ReorgDetector) notifyReorgToSubscription(id string, fromBlock, toBlock uint64) { - blocks := r.trackedBlocks[id] +func (r *ReorgDetector) notifyReorgToSubscription(id string, reorgBlock uint64) { + if id == unfinalisedBlocksID { + // unfinalised blocks are not subscribers, and reorg block should be > 0 + return + } + sub := r.subscriptions[id] - var found bool - for i := fromBlock; i <= toBlock; i++ { - if _, ok := blocks[i]; ok { - if !found { - // notify about the first reorged block that was tracked - // and wait for the receiver to process - found = true - sub.FirstReorgedBlock <- i - <-sub.ReorgProcessed - } - delete(blocks, i) + sub.pendingReorgsToBeProcessed.Add(1) + + // notify about the first reorged block that was tracked + // and wait for the receiver to process + sub.FirstReorgedBlock <- reorgBlock + <-sub.ReorgProcessed + + sub.pendingReorgsToBeProcessed.Done() +} + +// getUnfinalisedBlocksMap returns the map of unfinalised blocks +func (r *ReorgDetector) getUnfinalisedBlocksMap() blockMap { + r.trackedBlocksLock.RLock() + defer r.trackedBlocksLock.RUnlock() + + return r.trackedBlocks[unfinalisedBlocksID] +} + +// getTrackedBlocks returns a list of tracked blocks for each subscriber from db +func (r *ReorgDetector) getTrackedBlocks(ctx context.Context) (map[string]blockMap, error) { + tx, err := r.db.BeginRo(ctx) + if err != nil { + return nil, err + } + + defer tx.Rollback() + + cursor, err := tx.Cursor(subscriberBlocks) + if err != nil { + return nil, err + } + + defer cursor.Close() + + trackedBlocks := make(map[string]blockMap, 0) + + for k, v, err := cursor.First(); k != nil; k, v, err = cursor.Next() { + if err != nil { + return nil, err + } + + var blocks []block + if err := json.Unmarshal(v, &blocks); err != nil { + return nil, err } + + trackedBlocks[string(k)] = newBlockMap(blocks...) } - sub.pendingReorgsToBeProcessed.Done() + + if _, ok := trackedBlocks[unfinalisedBlocksID]; !ok { + // add unfinalised blocks to tracked blocks map if not present in db + trackedBlocks[unfinalisedBlocksID] = newBlockMap() + } + + return trackedBlocks, nil +} + +// saveTrackedBlock saves the tracked block for a subscriber in db and in memory +func (r *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b block) error { + tx, err := r.db.BeginRw(ctx) + if err != nil { + return err + } + + defer tx.Rollback() + + r.trackedBlocksLock.Lock() + + subscriberBlockMap, ok := r.trackedBlocks[id] + if !ok { + subscriberBlockMap = newBlockMap(b) + r.trackedBlocks[id] = subscriberBlockMap + } + + r.trackedBlocksLock.Unlock() + + raw, err := json.Marshal(subscriberBlockMap.getSorted()) + if err != nil { + + return err + } + + return tx.Put(subscriberBlocks, []byte(id), raw) +} + +// removeTrackedBlocks removes the tracked blocks for a subscriber in db and in memory +func (r *ReorgDetector) removeTrackedBlocks(ctx context.Context, lastFinalizedBlock uint64) error { + r.trackedBlocksLock.Lock() + defer r.trackedBlocksLock.Unlock() + + for id := range r.trackedBlocks { + newTrackedBlocks := r.trackedBlocks[id].getFromBlockSorted(lastFinalizedBlock) + + if err := r.updateTrackedBlocksNoLock(ctx, id, newBlockMap(newTrackedBlocks...)); err != nil { + return err + } + } + + return nil +} + +// updateTrackedBlocks updates the tracked blocks for a subscriber in db and in memory +func (r *ReorgDetector) updateTrackedBlocks(ctx context.Context, id string, blocks blockMap) error { + r.trackedBlocksLock.Lock() + defer r.trackedBlocksLock.Unlock() + + return r.updateTrackedBlocksNoLock(ctx, id, blocks) +} + +// updateTrackedBlocksNoLock updates the tracked blocks for a subscriber in db and in memory +func (r *ReorgDetector) updateTrackedBlocksNoLock(ctx context.Context, id string, blocks blockMap) error { + tx, err := r.db.BeginRw(ctx) + if err != nil { + return err + } + + defer tx.Rollback() + + raw, err := json.Marshal(blocks.getSorted()) + if err != nil { + return err + } + + if err := tx.Put(subscriberBlocks, []byte(id), raw); err != nil { + return err + } + + r.trackedBlocks[id] = blocks + + return nil } diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go new file mode 100644 index 00000000..55fc9e3d --- /dev/null +++ b/reorgdetector/reorgdetector_test.go @@ -0,0 +1,461 @@ +package reorgdetector + +import ( + "context" + "encoding/json" + "errors" + "fmt" + big "math/big" + "os" + "reflect" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + types "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" + "github.com/stretchr/testify/require" +) + +const testSubscriber = "testSubscriber" + +// newTestDB creates new instance of db used by tests. +func newTestDB(tb testing.TB) kv.RwDB { + tb.Helper() + + dir := fmt.Sprintf("/tmp/reorgdetector-temp_%v", time.Now().UTC().Format(time.RFC3339Nano)) + err := os.Mkdir(dir, 0775) + + require.NoError(tb, err) + + db, err := mdbx.NewMDBX(nil). + Path(dir). + WithTableCfg(tableCfgFunc). + Open() + require.NoError(tb, err) + + tb.Cleanup(func() { + require.NoError(tb, os.RemoveAll(dir)) + }) + + return db +} + +func TestBlockMap(t *testing.T) { + t.Parallel() + + // Create a new block map + bm := newBlockMap( + block{Num: 1, Hash: common.HexToHash("0x123")}, + block{Num: 2, Hash: common.HexToHash("0x456")}, + block{Num: 3, Hash: common.HexToHash("0x789")}, + ) + + t.Run("getSorted", func(t *testing.T) { + t.Parallel() + + sortedBlocks := bm.getSorted() + expectedSortedBlocks := []block{ + {Num: 1, Hash: common.HexToHash("0x123")}, + {Num: 2, Hash: common.HexToHash("0x456")}, + {Num: 3, Hash: common.HexToHash("0x789")}, + } + if !reflect.DeepEqual(sortedBlocks, expectedSortedBlocks) { + t.Errorf("getSorted() returned incorrect result, expected: %v, got: %v", expectedSortedBlocks, sortedBlocks) + } + }) + + t.Run("getFromBlockSorted", func(t *testing.T) { + t.Parallel() + + fromBlockSorted := bm.getFromBlockSorted(2) + expectedFromBlockSorted := []block{ + {Num: 3, Hash: common.HexToHash("0x789")}, + } + if !reflect.DeepEqual(fromBlockSorted, expectedFromBlockSorted) { + t.Errorf("getFromBlockSorted() returned incorrect result, expected: %v, got: %v", expectedFromBlockSorted, fromBlockSorted) + } + + // Test getFromBlockSorted function when blockNum is greater than the last block + fromBlockSorted = bm.getFromBlockSorted(4) + expectedFromBlockSorted = []block{} + if !reflect.DeepEqual(fromBlockSorted, expectedFromBlockSorted) { + t.Errorf("getFromBlockSorted() returned incorrect result, expected: %v, got: %v", expectedFromBlockSorted, fromBlockSorted) + } + }) + + t.Run("getClosestHigherBlock", func(t *testing.T) { + t.Parallel() + + bm := newBlockMap( + block{Num: 1, Hash: common.HexToHash("0x123")}, + block{Num: 2, Hash: common.HexToHash("0x456")}, + block{Num: 3, Hash: common.HexToHash("0x789")}, + ) + + // Test when the blockNum exists in the block map + b, exists := bm.getClosestHigherBlock(2) + require.True(t, exists) + expectedBlock := block{Num: 2, Hash: common.HexToHash("0x456")} + if b != expectedBlock { + t.Errorf("getClosestHigherBlock() returned incorrect result, expected: %v, got: %v", expectedBlock, b) + } + + // Test when the blockNum does not exist in the block map + b, exists = bm.getClosestHigherBlock(4) + require.False(t, exists) + expectedBlock = block{Num: 0, Hash: common.Hash{}} + if b != expectedBlock { + t.Errorf("getClosestHigherBlock() returned incorrect result, expected: %v, got: %v", expectedBlock, b) + } + }) + + t.Run("removeRange", func(t *testing.T) { + t.Parallel() + + bm := newBlockMap( + block{Num: 1, Hash: common.HexToHash("0x123")}, + block{Num: 2, Hash: common.HexToHash("0x456")}, + block{Num: 3, Hash: common.HexToHash("0x789")}, + block{Num: 4, Hash: common.HexToHash("0xabc")}, + block{Num: 5, Hash: common.HexToHash("0xdef")}, + ) + + bm.removeRange(3, 5) + + expectedBlocks := []block{ + {Num: 1, Hash: common.HexToHash("0x123")}, + {Num: 2, Hash: common.HexToHash("0x456")}, + } + + sortedBlocks := bm.getSorted() + + if !reflect.DeepEqual(sortedBlocks, expectedBlocks) { + t.Errorf("removeRange() failed, expected: %v, got: %v", expectedBlocks, sortedBlocks) + } + }) +} + +func TestReorgDetector_New(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("first initialization, no data", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(100)}, nil, + ) + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + require.Len(t, rd.trackedBlocks, 1) + + unfinalisedBlocksMap, exists := rd.trackedBlocks[unfinalisedBlocksID] + require.True(t, exists) + require.Empty(t, unfinalisedBlocksMap) + }) + + t.Run("getting last finalized block failed", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + expectedErr := errors.New("some error") + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return(nil, expectedErr) + + _, err := newReorgDetector(ctx, client, db) + require.ErrorIs(t, err, expectedErr) + }) + + t.Run("have tracked blocks and subscriptions no reorg - all blocks finalized", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + testBlocks := createTestBlocks(t, 1, 6) + unfinalisedBlocks := testBlocks[:5] + testSubscriberBlocks := testBlocks[:3] + + insertTestData(t, ctx, db, unfinalisedBlocks, unfinalisedBlocksID) + insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) + + for _, block := range unfinalisedBlocks { + client.On("HeaderByNumber", ctx, block.Number).Return( + block, nil, + ) + } + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + testBlocks[len(testBlocks)-1], nil, + ) + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + require.Len(t, rd.trackedBlocks, 2) // testSubscriber and unfinalisedBlocks + + unfinalisedBlocksMap, exists := rd.trackedBlocks[unfinalisedBlocksID] + require.True(t, exists) + require.Len(t, unfinalisedBlocksMap, 0) // since all blocks are finalized + + testSubscriberMap, exists := rd.trackedBlocks[testSubscriber] + require.True(t, exists) + require.Len(t, testSubscriberMap, 0) // since all blocks are finalized + }) + + t.Run("have tracked blocks and subscriptions no reorg - not all blocks finalized", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + testBlocks := createTestBlocks(t, 1, 7) + unfinalisedBlocks := testBlocks[:6] + testSubscriberBlocks := testBlocks[:4] + + insertTestData(t, ctx, db, unfinalisedBlocks, unfinalisedBlocksID) + insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) + + for _, block := range unfinalisedBlocks { + client.On("HeaderByNumber", ctx, block.Number).Return( + block, nil, + ) + } + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + testSubscriberBlocks[len(testSubscriberBlocks)-1], nil, + ) + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + require.Len(t, rd.trackedBlocks, 2) // testSubscriber and unfinalisedBlocks + + unfinalisedBlocksMap, exists := rd.trackedBlocks[unfinalisedBlocksID] + require.True(t, exists) + require.Len(t, unfinalisedBlocksMap, len(unfinalisedBlocks)-len(testSubscriberBlocks)) // since all blocks are finalized + + testSubscriberMap, exists := rd.trackedBlocks[testSubscriber] + require.True(t, exists) + require.Len(t, testSubscriberMap, 0) // since all blocks are finalized + }) + + t.Run("have tracked blocks and subscriptions - reorg happened", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + trackedBlocks := createTestBlocks(t, 1, 5) + testSubscriberBlocks := trackedBlocks[:5] + + insertTestData(t, ctx, db, nil, unfinalisedBlocksID) // no unfinalised blocks + insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) + + for _, block := range trackedBlocks[:3] { + client.On("HeaderByNumber", ctx, block.Number).Return( + block, nil, + ) + } + + reorgedBlocks := createTestBlocks(t, 4, 2) // block 4, and 5 are reorged + reorgedBlocks[0].ParentHash = trackedBlocks[2].Hash() // block 4 is reorged but his parent is block 3 + reorgedBlocks[1].ParentHash = reorgedBlocks[0].Hash() // block 5 is reorged but his parent is block 4 + + client.On("HeaderByNumber", ctx, reorgedBlocks[0].Number).Return( + reorgedBlocks[0], nil, + ) + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + reorgedBlocks[len(reorgedBlocks)-1], nil, + ) + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + require.Len(t, rd.trackedBlocks, 2) // testSubscriber and unfinalisedBlocks + + // we wait for the subscriber to be notified about the reorg + firstReorgedBlock := <-rd.subscriptions[testSubscriber].FirstReorgedBlock + require.Equal(t, reorgedBlocks[0].Number.Uint64(), firstReorgedBlock) + + // all blocks should be cleaned from the tracked blocks + // since subscriber had 5 blocks, 3 were finalized, and 2 were reorged but also finalized + subscriberBlocks := rd.trackedBlocks[testSubscriber] + require.Len(t, subscriberBlocks, 0) + }) +} + +func TestReorgDetector_AddBlockToTrack(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("no subscription", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(10)}, nil, + ) + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + + err = rd.AddBlockToTrack(ctx, testSubscriber, 1, common.HexToHash("0x123")) + require.ErrorIs(t, err, ErrNotSubscribed) + }) + + t.Run("no unfinalised blocks - block not finalised", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(10)}, nil, + ).Once() + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + + _, err = rd.Subscribe(testSubscriber) + require.NoError(t, err) + + err = rd.AddBlockToTrack(ctx, testSubscriber, 11, common.HexToHash("0x123")) // block not finalized + require.NoError(t, err) + + subBlocks := rd.trackedBlocks[testSubscriber] + require.Len(t, subBlocks, 1) + require.Equal(t, subBlocks[11].Hash, common.HexToHash("0x123")) + }) + + t.Run("have unfinalised blocks - block not finalized", func(t *testing.T) { + t.Parallel() + + client := NewEthClientMock(t) + db := newTestDB(t) + + unfinalisedBlocks := createTestBlocks(t, 11, 5) + insertTestData(t, ctx, db, unfinalisedBlocks, unfinalisedBlocksID) + + for _, block := range unfinalisedBlocks { + client.On("HeaderByNumber", ctx, block.Number).Return( + block, nil, + ) + } + + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(10)}, nil, + ).Once() + + rd, err := newReorgDetector(ctx, client, db) + require.NoError(t, err) + + _, err = rd.Subscribe(testSubscriber) + require.NoError(t, err) + + err = rd.AddBlockToTrack(ctx, testSubscriber, 11, unfinalisedBlocks[0].Hash()) // block not finalized + require.NoError(t, err) + + subBlocks := rd.trackedBlocks[testSubscriber] + require.Len(t, subBlocks, 1) + require.Equal(t, subBlocks[11].Hash, unfinalisedBlocks[0].Hash()) + }) +} + +func TestReorgDetector_removeFinalisedBlocks(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + client := NewEthClientMock(t) + db := newTestDB(t) + + unfinalisedBlocks := createTestBlocks(t, 1, 10) + insertTestData(t, ctx, db, unfinalisedBlocks, unfinalisedBlocksID) + insertTestData(t, ctx, db, unfinalisedBlocks, testSubscriber) + + // call for removeFinalisedBlocks + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(5)}, nil, + ) + + rd := &ReorgDetector{ + ethClient: client, + db: db, + trackedBlocks: make(map[string]blockMap), + waitPeriodBlockRemover: 100 * time.Millisecond, + waitPeriodBlockAdder: 100 * time.Millisecond, + subscriptions: map[string]*Subscription{ + testSubscriber: { + FirstReorgedBlock: make(chan uint64), + ReorgProcessed: make(chan bool), + }, + unfinalisedBlocksID: { + FirstReorgedBlock: make(chan uint64), + ReorgProcessed: make(chan bool), + }, + }, + } + + trackedBlocks, err := rd.getTrackedBlocks(ctx) + require.NoError(t, err) + require.Len(t, trackedBlocks, 2) + + rd.trackedBlocks = trackedBlocks + + // make sure we have all blocks in the tracked blocks before removing finalized blocks + require.Len(t, rd.trackedBlocks[unfinalisedBlocksID], len(unfinalisedBlocks)) + require.Len(t, rd.trackedBlocks[testSubscriber], len(unfinalisedBlocks)) + + // remove finalized blocks + go rd.removeFinalisedBlocks(ctx) + + time.Sleep(3 * time.Second) // wait for the go routine to remove the finalized blocks + cancel() + + // make sure all blocks are removed from the tracked blocks + require.Len(t, rd.trackedBlocks[unfinalisedBlocksID], 5) + require.Len(t, rd.trackedBlocks[testSubscriber], 5) +} + +func createTestBlocks(t *testing.T, startBlock uint64, count uint64) []*types.Header { + t.Helper() + + blocks := make([]*types.Header, 0, count) + for i := startBlock; i < startBlock+count; i++ { + blocks = append(blocks, &types.Header{Number: big.NewInt(int64(i))}) + } + + return blocks +} + +func insertTestData(t *testing.T, ctx context.Context, db kv.RwDB, blocks []*types.Header, id string) { + t.Helper() + + // Insert some test data + err := db.Update(ctx, func(tx kv.RwTx) error { + + blockMap := newBlockMap() + for _, b := range blocks { + blockMap[b.Number.Uint64()] = block{b.Number.Uint64(), b.Hash()} + } + + raw, err := json.Marshal(blockMap.getSorted()) + if err != nil { + return err + } + + return tx.Put(subscriberBlocks, []byte(id), raw) + }) + + require.NoError(t, err) +} diff --git a/test/Makefile b/test/Makefile index 68b245f6..817e042b 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,9 +1,15 @@ .PHONY: generate-mocks -generate-mocks: generate-mocks-localbridgesync +generate-mocks: + $(MAKE) generate-mocks-localbridgesync + $(MAKE) generate-mocks-reorgdetector .PHONY: generate-mocks-localbridgesync -generate-mocks-localbridgesync: ## Generates mocks for localbridgesync , using mockery tool +generate-mocks-localbridgesync: ## Generates mocks for localbridgesync, using mockery tool export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthClienter --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=L2Mock --filename=mock_l2_test.go export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=downloaderFull --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=DownloaderMock --filename=mock_downloader_test.go export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=processorInterface --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=ProcessorMock --filename=mock_processor_test.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ReorgDetector --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=ReorgDetectorMock --filename=mock_reorgdetector_test.go \ No newline at end of file + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ReorgDetector --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=ReorgDetectorMock --filename=mock_reorgdetector_test.go + +.PHONY: generate-mocks-reorgdetector +generate-mocks-reorgdetector: ## Generates mocks for reorgdetector, using mockery tool + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthClient --dir=../reorgdetector --output=../reorgdetector --outpkg=reorgdetector --inpackage --structname=EthClientMock --filename=mock_eth_client.go \ No newline at end of file