diff --git a/aggoracle/chaingersender/evm.go b/aggoracle/chaingersender/evm.go index 3c9a8771..b3b6e405 100644 --- a/aggoracle/chaingersender/evm.go +++ b/aggoracle/chaingersender/evm.go @@ -15,10 +15,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -var ( - waitPeriodMonitorTx = time.Second * 5 -) - type EthClienter interface { ethereum.LogFilterer ethereum.BlockNumberReader @@ -34,12 +30,13 @@ type EthTxManager interface { } type EVMChainGERSender struct { - gerContract *pessimisticglobalexitroot.Pessimisticglobalexitroot - gerAddr common.Address - sender common.Address - client EthClienter - ethTxMan EthTxManager - gasOffset uint64 + gerContract *pessimisticglobalexitroot.Pessimisticglobalexitroot + gerAddr common.Address + sender common.Address + client EthClienter + ethTxMan EthTxManager + gasOffset uint64 + waitPeriodMonitorTx time.Duration } func NewEVMChainGERSender( @@ -47,18 +44,20 @@ func NewEVMChainGERSender( client EthClienter, ethTxMan EthTxManager, gasOffset uint64, + waitPeriodMonitorTx time.Duration, ) (*EVMChainGERSender, error) { gerContract, err := pessimisticglobalexitroot.NewPessimisticglobalexitroot(globalExitRoot, client) if err != nil { return nil, err } return &EVMChainGERSender{ - gerContract: gerContract, - gerAddr: globalExitRoot, - sender: sender, - client: client, - ethTxMan: ethTxMan, - gasOffset: gasOffset, + gerContract: gerContract, + gerAddr: globalExitRoot, + sender: sender, + client: client, + ethTxMan: ethTxMan, + gasOffset: gasOffset, + waitPeriodMonitorTx: waitPeriodMonitorTx, }, nil } @@ -81,7 +80,7 @@ func (c *EVMChainGERSender) UpdateGERWaitUntilMined(ctx context.Context, ger com return err } for { - time.Sleep(waitPeriodMonitorTx) + time.Sleep(c.waitPeriodMonitorTx) res, err := c.ethTxMan.Result(ctx, id) if err != nil { log.Error("error calling ethTxMan.Result: ", err) diff --git a/aggoracle/e2e_test.go b/aggoracle/e2e_test.go index b8b76f4b..7d616be9 100644 --- a/aggoracle/e2e_test.go +++ b/aggoracle/e2e_test.go @@ -2,6 +2,7 @@ package aggoracle_test import ( "context" + "errors" "math/big" "strconv" "testing" @@ -16,6 +17,7 @@ import ( "github.com/0xPolygon/cdk/log" "github.com/0xPolygon/cdk/reorgdetector" ethtxmanager "github.com/0xPolygonHermez/zkevm-ethtx-manager/ethtxmanager" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -29,7 +31,7 @@ func TestEVM(t *testing.T) { ctx := context.Background() l1Client, syncer, gerL1Contract, authL1 := commonSetup(t) sender := evmSetup(t) - oracle, err := aggoracle.New(sender, l1Client.Client(), syncer, etherman.LatestBlock) + oracle, err := aggoracle.New(sender, l1Client.Client(), syncer, etherman.LatestBlock, time.Millisecond) require.NoError(t, err) go oracle.Start(ctx) @@ -57,9 +59,9 @@ func commonSetup(t *testing.T) ( require.NoError(t, err) // Syncer dbPathSyncer := t.TempDir() - syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerL1Addr, 10, etherman.LatestBlock, reorg, l1Client.Client(), 32) + syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerL1Addr, 10, etherman.LatestBlock, reorg, l1Client.Client(), 32, time.Millisecond) require.NoError(t, err) - go syncer.Sync(ctx) + go syncer.Start(ctx) return l1Client, syncer, gerL1Contract, authL1 } @@ -81,24 +83,56 @@ func evmSetup(t *testing.T) aggoracle.ChainSender { log.Error(err) return } - tx := types.NewTx(&types.LegacyTx{ + gas, err := l2Client.Client().EstimateGas(ctx, ethereum.CallMsg{ + From: authL2.From, To: args.Get(1).(*common.Address), - Nonce: nonce, Value: big.NewInt(0), Data: args.Get(4).([]byte), }) + if err != nil { + log.Error(err) + res, err := l2Client.Client().CallContract(ctx, ethereum.CallMsg{ + From: authL2.From, + To: args.Get(1).(*common.Address), + Value: big.NewInt(0), + Data: args.Get(4).([]byte), + }, nil) + log.Debugf("contract call: %s", res) + if err != nil { + log.Error(err) + } + return + } + price, err := l2Client.Client().SuggestGasPrice(ctx) + if err != nil { + log.Error(err) + } + tx := types.NewTx(&types.LegacyTx{ + To: args.Get(1).(*common.Address), + Nonce: nonce, + Value: big.NewInt(0), + Data: args.Get(4).([]byte), + Gas: gas, + GasPrice: price, + }) + tx.Gas() signedTx, err := authL2.Signer(authL2.From, tx) if err != nil { log.Error(err) return } - l2Client.Client().SendTransaction(ctx, signedTx) + err = l2Client.Client().SendTransaction(ctx, signedTx) + if err != nil { + log.Error(err) + return + } l2Client.Commit() }). Return(common.Hash{}, nil) // res, err := c.ethTxMan.Result(ctx, id) - ethTxManMock.On("Add", mock.Anything, mock.Anything).Return(ethtxmanager.MonitoredTxStatusMined, nil) - sender, err := chaingersender.NewEVMChainGERSender(gerL2Addr, authL2.From, l2Client.Client(), ethTxManMock, 0) + ethTxManMock.On("Result", mock.Anything, mock.Anything). + Return(ethtxmanager.MonitoredTxResult{Status: ethtxmanager.MonitoredTxStatusMined}, nil) + sender, err := chaingersender.NewEVMChainGERSender(gerL2Addr, authL2.From, l2Client.Client(), ethTxManMock, 0, time.Millisecond*50) require.NoError(t, err) return sender @@ -143,8 +177,18 @@ func newSimulatedEVMAggSovereignChain(auth *bind.TransactOpts) ( client = simulated.NewBackend(genesisAlloc, simulated.WithBlockGasLimit(blockGasLimit)) gerAddr, _, gerContract, err = gerContractEVMChain.DeployPessimisticglobalexitrootnopush0(auth, client.Client(), auth.From) + if err != nil { + return + } + client.Commit() + _GLOBAL_EXIT_ROOT_SETTER_ROLE := common.HexToHash("0x7b95520991dfda409891be0afa2635b63540f92ee996fda0bf695a166e5c5176") + _, err = gerContract.GrantRole(auth, _GLOBAL_EXIT_ROOT_SETTER_ROLE, auth.From) client.Commit() + hasRole, _ := gerContract.HasRole(&bind.CallOpts{Pending: false}, _GLOBAL_EXIT_ROOT_SETTER_ROLE, auth.From) + if !hasRole { + err = errors.New("failed to set role") + } return } @@ -159,7 +203,7 @@ func runTest( _, err := gerL1Contract.UpdateExitRoot(authL1, common.HexToHash(strconv.Itoa(i))) require.NoError(t, err) l1Client.Commit() - time.Sleep(time.Second * 30) + time.Sleep(time.Millisecond * 50) expectedGER, err := gerL1Contract.GetLastGlobalExitRoot(&bind.CallOpts{Pending: false}) require.NoError(t, err) isInjected, err := sender.IsGERAlreadyInjected(expectedGER) diff --git a/aggoracle/oracle.go b/aggoracle/oracle.go index f4957c7a..8f4b7a67 100644 --- a/aggoracle/oracle.go +++ b/aggoracle/oracle.go @@ -13,10 +13,6 @@ import ( "github.com/ethereum/go-ethereum/common" ) -var ( - waitPeriodNextGER = time.Second * 30 -) - type EthClienter interface { ethereum.LogFilterer ethereum.BlockNumberReader @@ -46,6 +42,7 @@ func New( l1Client EthClienter, l1InfoTreeSyncer L1InfoTreer, blockFinalityType etherman.BlockNumberFinality, + waitPeriodNextGER time.Duration, ) (*AggOracle, error) { ticker := time.NewTicker(waitPeriodNextGER) finality, err := blockFinalityType.ToBlockNum() diff --git a/l1infotreesync/downloader.go b/l1infotreesync/downloader.go index 549e482e..ba9b219b 100644 --- a/l1infotreesync/downloader.go +++ b/l1infotreesync/downloader.go @@ -1,13 +1,10 @@ package l1infotreesync import ( - "context" - "math/big" - "time" + "fmt" "github.com/0xPolygon/cdk-contracts-tooling/contracts/elderberry/polygonzkevmglobalexitrootv2" - "github.com/0xPolygon/cdk/etherman" - "github.com/0xPolygon/cdk/log" + "github.com/0xPolygon/cdk/sync" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -16,7 +13,6 @@ import ( ) var ( - waitForNewBlocksPeriod = time.Millisecond * 100 updateL1InfoTreeSignature = crypto.Keccak256Hash([]byte("UpdateL1InfoTree(bytes32,bytes32)")) ) @@ -27,172 +23,22 @@ type EthClienter interface { bind.ContractBackend } -type downloaderInterface interface { - waitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64) - getEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []block - getLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log - appendLog(b *block, l types.Log) - getBlockHeader(ctx context.Context, blockNum uint64) blockHeader -} - type L1InfoTreeUpdate struct { MainnetExitRoot common.Hash RollupExitRoot common.Hash } -type block struct { - blockHeader - Events []L1InfoTreeUpdate -} - -type blockHeader struct { - Num uint64 - Hash common.Hash - ParentHash common.Hash - Timestamp uint64 -} - -type downloader struct { - syncBlockChunkSize uint64 - downloaderInterface -} - -func newDownloader( - GERAddr common.Address, - ethClient EthClienter, - syncBlockChunkSize uint64, - blockFinalityType etherman.BlockNumberFinality, -) (*downloader, error) { - GERContract, err := polygonzkevmglobalexitrootv2.NewPolygonzkevmglobalexitrootv2(GERAddr, ethClient) - if err != nil { - return nil, err - } - finality, err := blockFinalityType.ToBlockNum() +func buildAppender(client EthClienter, globalExitRoot common.Address) (sync.LogAppenderMap, error) { + contract, err := polygonzkevmglobalexitrootv2.NewPolygonzkevmglobalexitrootv2(globalExitRoot, client) if err != nil { return nil, err } - return &downloader{ - syncBlockChunkSize: syncBlockChunkSize, - downloaderInterface: &downloaderImplementation{ - GERAddr: GERAddr, - GERContract: GERContract, - ethClient: ethClient, - blockFinality: finality, - }, - }, nil -} - -func (d *downloader) download(ctx context.Context, fromBlock uint64, downloadedCh chan block) { - lastBlock := d.waitForNewBlocks(ctx, 0) - for { - select { - case <-ctx.Done(): - log.Debug("closing channel") - close(downloadedCh) - return - default: - } - toBlock := fromBlock + d.syncBlockChunkSize - if toBlock > lastBlock { - toBlock = lastBlock - } - if fromBlock > toBlock { - log.Debug("waiting for new blocks, last block ", toBlock) - lastBlock = d.waitForNewBlocks(ctx, toBlock) - continue - } - log.Debugf("getting events from blocks %d to %d", fromBlock, toBlock) - blocks := d.getEventsByBlockRange(ctx, fromBlock, toBlock) - for _, b := range blocks { - log.Debugf("sending block %d to the driver (with events)", b.Num) - downloadedCh <- b - } - if len(blocks) == 0 || blocks[len(blocks)-1].Num < toBlock { - // Indicate the last downloaded block if there are not events on it - log.Debugf("sending block %d to the driver (without evvents)", toBlock) - downloadedCh <- block{ - blockHeader: d.getBlockHeader(ctx, toBlock), - } - } - fromBlock = toBlock + 1 - } -} - -type downloaderImplementation struct { - GERAddr common.Address - GERContract *polygonzkevmglobalexitrootv2.Polygonzkevmglobalexitrootv2 - ethClient EthClienter - blockFinality *big.Int -} - -func (d *downloaderImplementation) waitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64) { - attempts := 0 - for { - header, err := d.ethClient.HeaderByNumber(ctx, d.blockFinality) - if err != nil { - attempts++ - log.Error("error geting last block num from eth client: ", err) - retryHandler("waitForNewBlocks", attempts) - continue - } - if header.Number.Uint64() > lastBlockSeen { - return header.Number.Uint64() - } - time.Sleep(waitForNewBlocksPeriod) - } -} - -func (d *downloaderImplementation) getEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []block { - blocks := []block{} - logs := d.getLogs(ctx, fromBlock, toBlock) - for _, l := range logs { - if len(blocks) == 0 || blocks[len(blocks)-1].Num < l.BlockNumber { - b := d.getBlockHeader(ctx, l.BlockNumber) - blocks = append(blocks, block{ - blockHeader: blockHeader{ - Num: l.BlockNumber, - Hash: l.BlockHash, - Timestamp: b.Timestamp, - ParentHash: b.ParentHash, - }, - Events: []L1InfoTreeUpdate{}, - }) - } - d.appendLog(&blocks[len(blocks)-1], l) - } - - return blocks -} - -func (d *downloaderImplementation) getLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log { - query := ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(fromBlock), - Addresses: []common.Address{d.GERAddr}, - Topics: [][]common.Hash{ - {updateL1InfoTreeSignature}, - }, - ToBlock: new(big.Int).SetUint64(toBlock), - } - attempts := 0 - for { - logs, err := d.ethClient.FilterLogs(ctx, query) + appender := make(sync.LogAppenderMap) + appender[updateL1InfoTreeSignature] = func(b *sync.EVMBlock, l types.Log) error { + l1InfoTreeUpdate, err := contract.ParseUpdateL1InfoTree(l) if err != nil { - attempts++ - log.Error("error calling FilterLogs to eth client: ", err) - retryHandler("getLogs", attempts) - continue - } - return logs - } -} - -func (d *downloaderImplementation) appendLog(b *block, l types.Log) { - switch l.Topics[0] { - case updateL1InfoTreeSignature: - l1InfoTreeUpdate, err := d.GERContract.ParseUpdateL1InfoTree(l) - if err != nil { - log.Fatalf( - "error parsing log %+v using d.GERContract.ParseUpdateL1InfoTree: %v", + return fmt.Errorf( + "error parsing log %+v using contract.ParseUpdateL1InfoTree: %v", l, err, ) } @@ -200,26 +46,7 @@ func (d *downloaderImplementation) appendLog(b *block, l types.Log) { MainnetExitRoot: l1InfoTreeUpdate.MainnetExitRoot, RollupExitRoot: l1InfoTreeUpdate.RollupExitRoot, }) - default: - log.Fatalf("unexpected log %+v", l) - } -} - -func (d *downloaderImplementation) getBlockHeader(ctx context.Context, blockNum uint64) blockHeader { - attempts := 0 - for { - header, err := d.ethClient.HeaderByNumber(ctx, big.NewInt(int64(blockNum))) - if err != nil { - attempts++ - log.Errorf("error getting block header for block %d, err: %v", blockNum, err) - retryHandler("getBlockHeader", attempts) - continue - } - return blockHeader{ - Num: header.Number.Uint64(), - Hash: header.Hash(), - ParentHash: header.ParentHash, - Timestamp: header.Time, - } + return nil } + return appender, nil } diff --git a/l1infotreesync/driver.go b/l1infotreesync/driver.go deleted file mode 100644 index c907f681..00000000 --- a/l1infotreesync/driver.go +++ /dev/null @@ -1,143 +0,0 @@ -package l1infotreesync - -import ( - "context" - - "github.com/0xPolygon/cdk/log" - "github.com/0xPolygon/cdk/reorgdetector" - "github.com/ethereum/go-ethereum/common" -) - -const ( - downloadBufferSize = 1000 - reorgDetectorID = "localbridgesync" -) - -type downloaderFull interface { - downloaderInterface - download(ctx context.Context, fromBlock uint64, downloadedCh chan block) -} - -type driver struct { - reorgDetector ReorgDetector - reorgSub *reorgdetector.Subscription - processor processorInterface - downloader downloaderFull -} - -type processorInterface interface { - getLastProcessedBlock(ctx context.Context) (uint64, error) - processBlock(block block) error - reorg(firstReorgedBlock uint64) error -} - -type ReorgDetector interface { - Subscribe(id string) (*reorgdetector.Subscription, error) - AddBlockToTrack(ctx context.Context, id string, blockNum uint64, blockHash common.Hash) error -} - -func newDriver( - reorgDetector ReorgDetector, - processor processorInterface, - downloader downloaderFull, -) (*driver, error) { - reorgSub, err := reorgDetector.Subscribe(reorgDetectorID) - if err != nil { - return nil, err - } - return &driver{ - reorgDetector: reorgDetector, - reorgSub: reorgSub, - processor: processor, - downloader: downloader, - }, nil -} - -func (d *driver) Sync(ctx context.Context) { -reset: - var ( - lastProcessedBlock uint64 - attempts int - err error - ) - for { - lastProcessedBlock, err = d.processor.getLastProcessedBlock(ctx) - if err != nil { - attempts++ - log.Error("error geting last processed block: ", err) - retryHandler("Sync", attempts) - continue - } - break - } - cancellableCtx, cancel := context.WithCancel(ctx) - defer cancel() - - // start downloading - downloadCh := make(chan block, downloadBufferSize) - go d.downloader.download(cancellableCtx, lastProcessedBlock, downloadCh) - - for { - select { - case b := <-downloadCh: - log.Debug("handleNewBlock") - d.handleNewBlock(ctx, b) - case firstReorgedBlock := <-d.reorgSub.FirstReorgedBlock: - log.Debug("handleReorg") - d.handleReorg(cancel, downloadCh, firstReorgedBlock) - goto reset - } - } -} - -func (d *driver) handleNewBlock(ctx context.Context, b block) { - attempts := 0 - for { - err := d.reorgDetector.AddBlockToTrack(ctx, reorgDetectorID, b.Num, b.Hash) - if err != nil { - attempts++ - log.Errorf("error adding block %d to tracker: %v", b.Num, err) - retryHandler("handleNewBlock", attempts) - continue - } - break - } - attempts = 0 - for { - err := d.processor.processBlock(b) - if err != nil { - attempts++ - log.Errorf("error processing events for blcok %d, err: ", b.Num, err) - retryHandler("handleNewBlock", attempts) - continue - } - break - } -} - -func (d *driver) handleReorg( - cancel context.CancelFunc, downloadCh chan block, firstReorgedBlock uint64, -) { - // stop downloader - cancel() - _, ok := <-downloadCh - for ok { - _, ok = <-downloadCh - } - // handle reorg - attempts := 0 - for { - err := d.processor.reorg(firstReorgedBlock) - if err != nil { - attempts++ - log.Errorf( - "error processing reorg, last valid block %d, err: %v", - firstReorgedBlock, err, - ) - retryHandler("handleReorg", attempts) - continue - } - break - } - d.reorgSub.ReorgProcessed <- true -} diff --git a/l1infotreesync/driver_test.go b/l1infotreesync/driver_test.go deleted file mode 100644 index e206973a..00000000 --- a/l1infotreesync/driver_test.go +++ /dev/null @@ -1,211 +0,0 @@ -package l1infotreesync - -import ( - "context" - "errors" - "sync" - "testing" - "time" - - "github.com/0xPolygon/cdk/log" - "github.com/0xPolygon/cdk/reorgdetector" - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" -) - -func TestSync(t *testing.T) { - retryAfterErrorPeriod = time.Millisecond * 100 - rdm := NewReorgDetectorMock(t) - pm := NewProcessorMock(t) - dm := NewDownloaderMock(t) - firstReorgedBlock := make(chan uint64) - reorgProcessed := make(chan bool) - rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{ - FirstReorgedBlock: firstReorgedBlock, - ReorgProcessed: reorgProcessed, - }) - driver, err := newDriver(rdm, pm, dm) - require.NoError(t, err) - ctx := context.Background() - expectedBlock1 := block{ - blockHeader: blockHeader{ - Num: 3, - Hash: common.HexToHash("03"), - }, - } - expectedBlock2 := block{ - blockHeader: blockHeader{ - Num: 9, - Hash: common.HexToHash("09"), - }, - } - type reorgSemaphore struct { - mu sync.Mutex - green bool - } - reorg1Completed := reorgSemaphore{} - dm.On("download", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { - ctx := args.Get(0).(context.Context) - downloadedCh := args.Get(2).(chan block) - log.Info("entering mock loop") - for { - select { - case <-ctx.Done(): - log.Info("closing channel") - close(downloadedCh) - return - default: - } - reorg1Completed.mu.Lock() - green := reorg1Completed.green - reorg1Completed.mu.Unlock() - if green { - downloadedCh <- expectedBlock2 - } else { - downloadedCh <- expectedBlock1 - } - time.Sleep(100 * time.Millisecond) - } - }) - - // Mocking this actions, the driver should "store" all the blocks from the downloader - pm.On("getLastProcessedBlock", ctx). - Return(uint64(3), nil) - rdm.On("AddBlockToTrack", ctx, reorgDetectorID, expectedBlock1.Num, expectedBlock1.Hash). - Return(nil) - pm.On("storeBridgeEvents", expectedBlock1.Num, expectedBlock1.Events). - Return(nil) - rdm.On("AddBlockToTrack", ctx, reorgDetectorID, expectedBlock2.Num, expectedBlock2.Hash). - Return(nil) - pm.On("storeBridgeEvents", expectedBlock2.Num, expectedBlock2.Events). - Return(nil) - go driver.Sync(ctx) - time.Sleep(time.Millisecond * 200) // time to download expectedBlock1 - - // Trigger reorg 1 - reorgedBlock1 := uint64(5) - pm.On("reorg", reorgedBlock1).Return(nil) - firstReorgedBlock <- reorgedBlock1 - ok := <-reorgProcessed - require.True(t, ok) - reorg1Completed.mu.Lock() - reorg1Completed.green = true - reorg1Completed.mu.Unlock() - time.Sleep(time.Millisecond * 200) // time to download expectedBlock2 - - // Trigger reorg 2: syncer restarts the porcess - reorgedBlock2 := uint64(7) - pm.On("reorg", reorgedBlock2).Return(nil) - firstReorgedBlock <- reorgedBlock2 - ok = <-reorgProcessed - require.True(t, ok) -} - -func TestHandleNewBlock(t *testing.T) { - retryAfterErrorPeriod = time.Millisecond * 100 - rdm := NewReorgDetectorMock(t) - pm := NewProcessorMock(t) - dm := NewDownloaderMock(t) - rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{}) - driver, err := newDriver(rdm, pm, dm) - require.NoError(t, err) - ctx := context.Background() - - // happy path - b1 := block{ - blockHeader: blockHeader{ - Num: 1, - Hash: common.HexToHash("f00"), - }, - } - rdm. - On("AddBlockToTrack", ctx, reorgDetectorID, b1.Num, b1.Hash). - Return(nil) - pm.On("storeBridgeEvents", b1.Num, b1.Events). - Return(nil) - driver.handleNewBlock(ctx, b1) - - // reorg deteector fails once - b2 := block{ - blockHeader: blockHeader{ - Num: 2, - Hash: common.HexToHash("f00"), - }, - } - rdm. - On("AddBlockToTrack", ctx, reorgDetectorID, b2.Num, b2.Hash). - Return(errors.New("foo")).Once() - rdm. - On("AddBlockToTrack", ctx, reorgDetectorID, b2.Num, b2.Hash). - Return(nil).Once() - pm.On("storeBridgeEvents", b2.Num, b2.Events). - Return(nil) - driver.handleNewBlock(ctx, b2) - - // processor fails once - b3 := block{ - blockHeader: blockHeader{ - Num: 3, - Hash: common.HexToHash("f00"), - }, - } - rdm. - On("AddBlockToTrack", ctx, reorgDetectorID, b3.Num, b3.Hash). - Return(nil) - pm.On("storeBridgeEvents", b3.Num, b3.Events). - Return(errors.New("foo")).Once() - pm.On("storeBridgeEvents", b3.Num, b3.Events). - Return(nil).Once() - driver.handleNewBlock(ctx, b3) - -} - -func TestHandleReorg(t *testing.T) { - retryAfterErrorPeriod = time.Millisecond * 100 - rdm := NewReorgDetectorMock(t) - pm := NewProcessorMock(t) - dm := NewDownloaderMock(t) - reorgProcessed := make(chan bool) - rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{ - ReorgProcessed: reorgProcessed, - }) - driver, err := newDriver(rdm, pm, dm) - require.NoError(t, err) - ctx := context.Background() - - // happy path - _, cancel := context.WithCancel(ctx) - downloadCh := make(chan block) - firstReorgedBlock := uint64(5) - pm.On("reorg", firstReorgedBlock).Return(nil) - go driver.handleReorg(cancel, downloadCh, firstReorgedBlock) - close(downloadCh) - done := <-reorgProcessed - require.True(t, done) - - // download ch sends some garbage - _, cancel = context.WithCancel(ctx) - downloadCh = make(chan block) - firstReorgedBlock = uint64(6) - pm.On("reorg", firstReorgedBlock).Return(nil) - go driver.handleReorg(cancel, downloadCh, firstReorgedBlock) - downloadCh <- block{} - downloadCh <- block{} - downloadCh <- block{} - close(downloadCh) - done = <-reorgProcessed - require.True(t, done) - - // processor fails 2 times - _, cancel = context.WithCancel(ctx) - downloadCh = make(chan block) - firstReorgedBlock = uint64(7) - pm.On("reorg", firstReorgedBlock).Return(errors.New("foo")).Once() - pm.On("reorg", firstReorgedBlock).Return(errors.New("foo")).Once() - pm.On("reorg", firstReorgedBlock).Return(nil).Once() - go driver.handleReorg(cancel, downloadCh, firstReorgedBlock) - close(downloadCh) - done = <-reorgProcessed - require.True(t, done) -} diff --git a/l1infotreesync/e2e_test.go b/l1infotreesync/e2e_test.go index baad1d53..dc5734d5 100644 --- a/l1infotreesync/e2e_test.go +++ b/l1infotreesync/e2e_test.go @@ -43,7 +43,6 @@ func newSimulatedClient(auth *bind.TransactOpts) ( } func TestE2E(t *testing.T) { - waitForNewBlocksPeriod = time.Millisecond ctx := context.Background() dbPath := t.TempDir() privateKey, err := crypto.GenerateKey() @@ -51,13 +50,13 @@ func TestE2E(t *testing.T) { auth, err := bind.NewKeyedTransactorWithChainID(privateKey, big.NewInt(1337)) require.NoError(t, err) rdm := NewReorgDetectorMock(t) - rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{}) + rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{}, nil) rdm.On("AddBlockToTrack", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) client, gerAddr, gerSc, err := newSimulatedClient(auth) require.NoError(t, err) - syncer, err := New(ctx, dbPath, gerAddr, 10, etherman.LatestBlock, rdm, client.Client(), 32) + syncer, err := New(ctx, dbPath, gerAddr, 10, etherman.LatestBlock, rdm, client.Client(), 32, time.Millisecond) require.NoError(t, err) - go syncer.Sync(ctx) + go syncer.Start(ctx) // Update GER 10 times // TODO: test syncer restart diff --git a/l1infotreesync/l1infotreesync.go b/l1infotreesync/l1infotreesync.go index fa04f0e6..847488df 100644 --- a/l1infotreesync/l1infotreesync.go +++ b/l1infotreesync/l1infotreesync.go @@ -5,18 +5,23 @@ import ( "time" "github.com/0xPolygon/cdk/etherman" - "github.com/0xPolygon/cdk/log" + "github.com/0xPolygon/cdk/sync" "github.com/ethereum/go-ethereum/common" ) +const ( + reorgDetectorID = "l1infotreesync" + downloadBufferSize = 1000 +) + var ( retryAfterErrorPeriod = time.Second * 10 maxRetryAttemptsAfterError = 5 ) type L1InfoTreeSync struct { - *processor - *driver + processor *processor + driver *sync.EVMDriver } func New( @@ -25,31 +30,66 @@ func New( globalExitRoot common.Address, syncBlockChunkSize uint64, blockFinalityType etherman.BlockNumberFinality, - rd ReorgDetector, + rd sync.ReorgDetector, l1Client EthClienter, treeHeight uint8, + waitForNewBlocksPeriod time.Duration, ) (*L1InfoTreeSync, error) { - p, err := newProcessor(ctx, dbPath, treeHeight) + processor, err := newProcessor(ctx, dbPath, treeHeight) if err != nil { return nil, err } - dwn, err := newDownloader(globalExitRoot, l1Client, syncBlockChunkSize, blockFinalityType) + + appender, err := buildAppender(l1Client, globalExitRoot) if err != nil { return nil, err } - dri, err := newDriver(rd, p, dwn) + downloader, err := sync.NewEVMDownloader( + l1Client, + syncBlockChunkSize, + blockFinalityType, + waitForNewBlocksPeriod, + appender, + []common.Address{globalExitRoot}, + ) if err != nil { return nil, err } - return &L1InfoTreeSync{p, dri}, nil -} -func retryHandler(funcName string, attempts int) { - if attempts >= maxRetryAttemptsAfterError { - log.Fatalf( - "%s failed too many times (%d)", - funcName, maxRetryAttemptsAfterError, - ) + driver, err := sync.NewEVMDriver(rd, processor, downloader, reorgDetectorID, downloadBufferSize) + if err != nil { + return nil, err } - time.Sleep(retryAfterErrorPeriod) + return &L1InfoTreeSync{ + processor: processor, + driver: driver, + }, nil +} + +func (s *L1InfoTreeSync) Start(ctx context.Context) { + s.driver.Sync(ctx) +} + +func (s *L1InfoTreeSync) ComputeMerkleProofByIndex(ctx context.Context, index uint32) ([][32]byte, common.Hash, error) { + return s.processor.ComputeMerkleProofByIndex(ctx, index) +} + +func (s *L1InfoTreeSync) ComputeMerkleProofByRoot(ctx context.Context, root common.Hash) ([][32]byte, common.Hash, error) { + return s.processor.ComputeMerkleProofByRoot(ctx, root) +} + +func (s *L1InfoTreeSync) GetInfoByRoot(ctx context.Context, root common.Hash) (*L1InfoTreeLeaf, error) { + return s.processor.GetInfoByRoot(ctx, root) +} + +func (s *L1InfoTreeSync) GetLatestInfoUntilBlock(ctx context.Context, blockNum uint64) (*L1InfoTreeLeaf, error) { + return s.processor.GetLatestInfoUntilBlock(ctx, blockNum) +} + +func (s *L1InfoTreeSync) GetInfoByIndex(ctx context.Context, index uint32) (*L1InfoTreeLeaf, error) { + return s.processor.GetInfoByIndex(ctx, index) +} + +func (s *L1InfoTreeSync) GetInfoByHash(ctx context.Context, hash []byte) (*L1InfoTreeLeaf, error) { + return s.processor.GetInfoByHash(ctx, hash) } diff --git a/l1infotreesync/processor.go b/l1infotreesync/processor.go index 12047d77..2247fae3 100644 --- a/l1infotreesync/processor.go +++ b/l1infotreesync/processor.go @@ -7,6 +7,7 @@ import ( "errors" "github.com/0xPolygon/cdk/l1infotree" + "github.com/0xPolygon/cdk/sync" "github.com/ethereum/go-ethereum/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" @@ -268,7 +269,7 @@ func (p *processor) getInfoByHashWithTx(tx kv.Tx, hash []byte) (*L1InfoTreeLeaf, }, nil } -func (p *processor) getLastProcessedBlock(ctx context.Context) (uint64, error) { +func (p *processor) GetLastProcessedBlock(ctx context.Context) (uint64, error) { tx, err := p.db.BeginRo(ctx) if err != nil { return 0, err @@ -287,7 +288,7 @@ func (p *processor) getLastProcessedBlockWithTx(tx kv.Tx) (uint64, error) { } } -func (p *processor) reorg(firstReorgedBlock uint64) error { +func (p *processor) Reorg(firstReorgedBlock uint64) error { // TODO: Does tree need to be reorged? tx, err := p.db.BeginRw(context.Background()) if err != nil { @@ -362,9 +363,9 @@ func (p *processor) deleteLeaf(tx kv.RwTx, index uint32) error { return nil } -// processBlock process the leafs of the L1 info tree found on a block +// ProcessBlock process the leafs of the L1 info tree found on a block // this function can be called without leafs with the intention to track the last processed block -func (p *processor) processBlock(b block) error { +func (p *processor) ProcessBlock(b sync.EVMBlock) error { tx, err := p.db.BeginRw(context.Background()) if err != nil { return err @@ -380,11 +381,12 @@ func (p *processor) processBlock(b block) error { } else { initialIndex = lastIndex + 1 } - for i, l := range b.Events { + for i, e := range b.Events { + event := e.(L1InfoTreeUpdate) leafToStore := storeLeaf{ Index: initialIndex + uint32(i), - MainnetExitRoot: l.MainnetExitRoot, - RollupExitRoot: l.RollupExitRoot, + MainnetExitRoot: event.MainnetExitRoot, + RollupExitRoot: event.RollupExitRoot, ParentHash: b.ParentHash, Timestamp: b.Timestamp, BlockNumber: b.Num, diff --git a/l1infotreesync/processor_test.go b/l1infotreesync/processor_test.go index 949a3fde..b6f6c942 100644 --- a/l1infotreesync/processor_test.go +++ b/l1infotreesync/processor_test.go @@ -1,373 +1,363 @@ package l1infotreesync -import ( - "context" - "fmt" - "slices" - "testing" +// func TestProceessor(t *testing.T) { +// path := t.TempDir() +// ctx := context.Background() +// p, err := newProcessor(ctx, path, 32) +// require.NoError(t, err) +// actions := []processAction{ +// // processed: ~ +// &getLastProcessedBlockAction{ +// p: p, +// description: "on an empty processor", +// ctx: context.Background(), +// expectedLastProcessedBlock: 0, +// expectedErr: nil, +// }, +// &reorgAction{ +// p: p, +// description: "on an empty processor: firstReorgedBlock = 0", +// firstReorgedBlock: 0, +// expectedErr: nil, +// }, +// &reorgAction{ +// p: p, +// description: "on an empty processor: firstReorgedBlock = 1", +// firstReorgedBlock: 1, +// expectedErr: nil, +// }, +// &getClaimsAndBridgesAction{ +// p: p, +// description: "on an empty processor", +// ctx: context.Background(), +// fromBlock: 0, +// toBlock: 2, +// expectedEvents: nil, +// expectedErr: ErrBlockNotProcessed, +// }, +// &storeL1InfoTreeUpdatesAction{ +// p: p, +// description: "block1", +// b: block1, +// expectedErr: nil, +// }, +// // processed: block1 +// &getLastProcessedBlockAction{ +// p: p, +// description: "after block1", +// ctx: context.Background(), +// expectedLastProcessedBlock: 1, +// expectedErr: nil, +// }, +// &getClaimsAndBridgesAction{ +// p: p, +// description: "after block1: range 0, 2", +// ctx: context.Background(), +// fromBlock: 0, +// toBlock: 2, +// expectedEvents: nil, +// expectedErr: ErrBlockNotProcessed, +// }, +// &getClaimsAndBridgesAction{ +// p: p, +// description: "after block1: range 1, 1", +// ctx: context.Background(), +// fromBlock: 1, +// toBlock: 1, +// expectedEvents: block1.Events, +// expectedErr: nil, +// }, +// &reorgAction{ +// p: p, +// description: "after block1", +// firstReorgedBlock: 1, +// expectedErr: nil, +// }, +// // processed: ~ +// &getClaimsAndBridgesAction{ +// p: p, +// description: "after block1 reorged", +// ctx: context.Background(), +// fromBlock: 0, +// toBlock: 2, +// expectedEvents: nil, +// expectedErr: ErrBlockNotProcessed, +// }, +// &storeL1InfoTreeUpdatesAction{ +// p: p, +// description: "block1 (after it's reorged)", +// b: block1, +// expectedErr: nil, +// }, +// // processed: block3 +// &storeL1InfoTreeUpdatesAction{ +// p: p, +// description: "block3", +// b: block3, +// expectedErr: nil, +// }, +// // processed: block1, block3 +// &getLastProcessedBlockAction{ +// p: p, +// description: "after block3", +// ctx: context.Background(), +// expectedLastProcessedBlock: 3, +// expectedErr: nil, +// }, +// &getClaimsAndBridgesAction{ +// p: p, +// description: "after block3: range 2, 2", +// ctx: context.Background(), +// fromBlock: 2, +// toBlock: 2, +// expectedEvents: []L1InfoTreeUpdate{}, +// expectedErr: nil, +// }, +// &getClaimsAndBridgesAction{ +// p: p, +// description: "after block3: range 1, 3", +// ctx: context.Background(), +// fromBlock: 1, +// toBlock: 3, +// expectedEvents: append(block1.Events, block3.Events...), +// expectedErr: nil, +// }, +// &reorgAction{ +// p: p, +// description: "after block3, with value 3", +// firstReorgedBlock: 3, +// expectedErr: nil, +// }, +// // processed: block1 +// &getLastProcessedBlockAction{ +// p: p, +// description: "after block3 reorged", +// ctx: context.Background(), +// expectedLastProcessedBlock: 2, +// expectedErr: nil, +// }, +// &reorgAction{ +// p: p, +// description: "after block3, with value 2", +// firstReorgedBlock: 2, +// expectedErr: nil, +// }, +// &getLastProcessedBlockAction{ +// p: p, +// description: "after block2 reorged", +// ctx: context.Background(), +// expectedLastProcessedBlock: 1, +// expectedErr: nil, +// }, +// &storeL1InfoTreeUpdatesAction{ +// p: p, +// description: "block3 after reorg", +// b: block3, +// expectedErr: nil, +// }, +// // processed: block1, block3 +// &storeL1InfoTreeUpdatesAction{ +// p: p, +// description: "block4", +// b: block4, +// expectedErr: nil, +// }, +// // processed: block1, block3, block4 +// &storeL1InfoTreeUpdatesAction{ +// p: p, +// description: "block5", +// b: block5, +// expectedErr: nil, +// }, +// // processed: block1, block3, block4, block5 +// &getLastProcessedBlockAction{ +// p: p, +// description: "after block5", +// ctx: context.Background(), +// expectedLastProcessedBlock: 5, +// expectedErr: nil, +// }, +// &getClaimsAndBridgesAction{ +// p: p, +// description: "after block5: range 1, 3", +// ctx: context.Background(), +// fromBlock: 1, +// toBlock: 3, +// expectedEvents: append(block1.Events, block3.Events...), +// expectedErr: nil, +// }, +// &getClaimsAndBridgesAction{ +// p: p, +// description: "after block5: range 4, 5", +// ctx: context.Background(), +// fromBlock: 4, +// toBlock: 5, +// expectedEvents: append(block4.Events, block5.Events...), +// expectedErr: nil, +// }, +// &getClaimsAndBridgesAction{ +// p: p, +// description: "after block5: range 0, 5", +// ctx: context.Background(), +// fromBlock: 0, +// toBlock: 5, +// expectedEvents: slices.Concat( +// block1.Events, +// block3.Events, +// block4.Events, +// block5.Events, +// ), +// expectedErr: nil, +// }, +// } - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/require" -) +// for _, a := range actions { +// t.Run(fmt.Sprintf("%s: %s", a.method(), a.desc()), a.execute) +// } +// } -func TestProceessor(t *testing.T) { - path := t.TempDir() - ctx := context.Background() - p, err := newProcessor(ctx, path, 32) - require.NoError(t, err) - actions := []processAction{ - // processed: ~ - &getLastProcessedBlockAction{ - p: p, - description: "on an empty processor", - ctx: context.Background(), - expectedLastProcessedBlock: 0, - expectedErr: nil, - }, - &reorgAction{ - p: p, - description: "on an empty processor: firstReorgedBlock = 0", - firstReorgedBlock: 0, - expectedErr: nil, - }, - &reorgAction{ - p: p, - description: "on an empty processor: firstReorgedBlock = 1", - firstReorgedBlock: 1, - expectedErr: nil, - }, - &getClaimsAndBridgesAction{ - p: p, - description: "on an empty processor", - ctx: context.Background(), - fromBlock: 0, - toBlock: 2, - expectedEvents: nil, - expectedErr: ErrBlockNotProcessed, - }, - &storeL1InfoTreeUpdatesAction{ - p: p, - description: "block1", - b: block1, - expectedErr: nil, - }, - // processed: block1 - &getLastProcessedBlockAction{ - p: p, - description: "after block1", - ctx: context.Background(), - expectedLastProcessedBlock: 1, - expectedErr: nil, - }, - &getClaimsAndBridgesAction{ - p: p, - description: "after block1: range 0, 2", - ctx: context.Background(), - fromBlock: 0, - toBlock: 2, - expectedEvents: nil, - expectedErr: ErrBlockNotProcessed, - }, - &getClaimsAndBridgesAction{ - p: p, - description: "after block1: range 1, 1", - ctx: context.Background(), - fromBlock: 1, - toBlock: 1, - expectedEvents: block1.Events, - expectedErr: nil, - }, - &reorgAction{ - p: p, - description: "after block1", - firstReorgedBlock: 1, - expectedErr: nil, - }, - // processed: ~ - &getClaimsAndBridgesAction{ - p: p, - description: "after block1 reorged", - ctx: context.Background(), - fromBlock: 0, - toBlock: 2, - expectedEvents: nil, - expectedErr: ErrBlockNotProcessed, - }, - &storeL1InfoTreeUpdatesAction{ - p: p, - description: "block1 (after it's reorged)", - b: block1, - expectedErr: nil, - }, - // processed: block3 - &storeL1InfoTreeUpdatesAction{ - p: p, - description: "block3", - b: block3, - expectedErr: nil, - }, - // processed: block1, block3 - &getLastProcessedBlockAction{ - p: p, - description: "after block3", - ctx: context.Background(), - expectedLastProcessedBlock: 3, - expectedErr: nil, - }, - &getClaimsAndBridgesAction{ - p: p, - description: "after block3: range 2, 2", - ctx: context.Background(), - fromBlock: 2, - toBlock: 2, - expectedEvents: []L1InfoTreeUpdate{}, - expectedErr: nil, - }, - &getClaimsAndBridgesAction{ - p: p, - description: "after block3: range 1, 3", - ctx: context.Background(), - fromBlock: 1, - toBlock: 3, - expectedEvents: append(block1.Events, block3.Events...), - expectedErr: nil, - }, - &reorgAction{ - p: p, - description: "after block3, with value 3", - firstReorgedBlock: 3, - expectedErr: nil, - }, - // processed: block1 - &getLastProcessedBlockAction{ - p: p, - description: "after block3 reorged", - ctx: context.Background(), - expectedLastProcessedBlock: 2, - expectedErr: nil, - }, - &reorgAction{ - p: p, - description: "after block3, with value 2", - firstReorgedBlock: 2, - expectedErr: nil, - }, - &getLastProcessedBlockAction{ - p: p, - description: "after block2 reorged", - ctx: context.Background(), - expectedLastProcessedBlock: 1, - expectedErr: nil, - }, - &storeL1InfoTreeUpdatesAction{ - p: p, - description: "block3 after reorg", - b: block3, - expectedErr: nil, - }, - // processed: block1, block3 - &storeL1InfoTreeUpdatesAction{ - p: p, - description: "block4", - b: block4, - expectedErr: nil, - }, - // processed: block1, block3, block4 - &storeL1InfoTreeUpdatesAction{ - p: p, - description: "block5", - b: block5, - expectedErr: nil, - }, - // processed: block1, block3, block4, block5 - &getLastProcessedBlockAction{ - p: p, - description: "after block5", - ctx: context.Background(), - expectedLastProcessedBlock: 5, - expectedErr: nil, - }, - &getClaimsAndBridgesAction{ - p: p, - description: "after block5: range 1, 3", - ctx: context.Background(), - fromBlock: 1, - toBlock: 3, - expectedEvents: append(block1.Events, block3.Events...), - expectedErr: nil, - }, - &getClaimsAndBridgesAction{ - p: p, - description: "after block5: range 4, 5", - ctx: context.Background(), - fromBlock: 4, - toBlock: 5, - expectedEvents: append(block4.Events, block5.Events...), - expectedErr: nil, - }, - &getClaimsAndBridgesAction{ - p: p, - description: "after block5: range 0, 5", - ctx: context.Background(), - fromBlock: 0, - toBlock: 5, - expectedEvents: slices.Concat( - block1.Events, - block3.Events, - block4.Events, - block5.Events, - ), - expectedErr: nil, - }, - } +// // BOILERPLATE - for _, a := range actions { - t.Run(fmt.Sprintf("%s: %s", a.method(), a.desc()), a.execute) - } -} +// // blocks -// BOILERPLATE +// var ( +// block1 = block{ +// blockHeader: blockHeader{ +// Num: 1, +// Hash: common.HexToHash("01"), +// }, +// Events: []L1InfoTreeUpdate{ +// {RollupExitRoot: common.HexToHash("01")}, +// {MainnetExitRoot: common.HexToHash("01")}, +// }, +// } +// block3 = block{ +// blockHeader: blockHeader{ +// Num: 3, +// Hash: common.HexToHash("02"), +// }, +// Events: []L1InfoTreeUpdate{ +// {RollupExitRoot: common.HexToHash("02"), MainnetExitRoot: common.HexToHash("02")}, +// }, +// } +// block4 = block{ +// blockHeader: blockHeader{ +// Num: 4, +// Hash: common.HexToHash("03"), +// }, +// Events: []L1InfoTreeUpdate{}, +// } +// block5 = block{ +// blockHeader: blockHeader{ +// Num: 5, +// Hash: common.HexToHash("04"), +// }, +// Events: []L1InfoTreeUpdate{ +// {RollupExitRoot: common.HexToHash("04")}, +// {MainnetExitRoot: common.HexToHash("05")}, +// }, +// } +// ) -// blocks +// // actions -var ( - block1 = block{ - blockHeader: blockHeader{ - Num: 1, - Hash: common.HexToHash("01"), - }, - Events: []L1InfoTreeUpdate{ - {RollupExitRoot: common.HexToHash("01")}, - {MainnetExitRoot: common.HexToHash("01")}, - }, - } - block3 = block{ - blockHeader: blockHeader{ - Num: 3, - Hash: common.HexToHash("02"), - }, - Events: []L1InfoTreeUpdate{ - {RollupExitRoot: common.HexToHash("02"), MainnetExitRoot: common.HexToHash("02")}, - }, - } - block4 = block{ - blockHeader: blockHeader{ - Num: 4, - Hash: common.HexToHash("03"), - }, - Events: []L1InfoTreeUpdate{}, - } - block5 = block{ - blockHeader: blockHeader{ - Num: 5, - Hash: common.HexToHash("04"), - }, - Events: []L1InfoTreeUpdate{ - {RollupExitRoot: common.HexToHash("04")}, - {MainnetExitRoot: common.HexToHash("05")}, - }, - } -) +// type processAction interface { +// method() string +// desc() string +// execute(t *testing.T) +// } -// actions +// // GetClaimsAndBridges -type processAction interface { - method() string - desc() string - execute(t *testing.T) -} +// type getClaimsAndBridgesAction struct { +// p *processor +// description string +// ctx context.Context +// fromBlock uint64 +// toBlock uint64 +// expectedEvents []L1InfoTreeUpdate +// expectedErr error +// } -// GetClaimsAndBridges +// func (a *getClaimsAndBridgesAction) method() string { +// return "GetClaimsAndBridges" +// } -type getClaimsAndBridgesAction struct { - p *processor - description string - ctx context.Context - fromBlock uint64 - toBlock uint64 - expectedEvents []L1InfoTreeUpdate - expectedErr error -} +// func (a *getClaimsAndBridgesAction) desc() string { +// return a.description +// } -func (a *getClaimsAndBridgesAction) method() string { - return "GetClaimsAndBridges" -} +// func (a *getClaimsAndBridgesAction) execute(t *testing.T) { +// // TODO: add relevant getters +// // actualEvents, actualErr := a.p.GetClaimsAndBridges(a.ctx, a.fromBlock, a.toBlock) +// // require.Equal(t, a.expectedEvents, actualEvents) +// // require.Equal(t, a.expectedErr, actualErr) +// } -func (a *getClaimsAndBridgesAction) desc() string { - return a.description -} +// // getLastProcessedBlock -func (a *getClaimsAndBridgesAction) execute(t *testing.T) { - // TODO: add relevant getters - // actualEvents, actualErr := a.p.GetClaimsAndBridges(a.ctx, a.fromBlock, a.toBlock) - // require.Equal(t, a.expectedEvents, actualEvents) - // require.Equal(t, a.expectedErr, actualErr) -} +// type getLastProcessedBlockAction struct { +// p *processor +// description string +// ctx context.Context +// expectedLastProcessedBlock uint64 +// expectedErr error +// } -// getLastProcessedBlock +// func (a *getLastProcessedBlockAction) method() string { +// return "getLastProcessedBlock" +// } -type getLastProcessedBlockAction struct { - p *processor - description string - ctx context.Context - expectedLastProcessedBlock uint64 - expectedErr error -} +// func (a *getLastProcessedBlockAction) desc() string { +// return a.description +// } -func (a *getLastProcessedBlockAction) method() string { - return "getLastProcessedBlock" -} +// func (a *getLastProcessedBlockAction) execute(t *testing.T) { +// actualLastProcessedBlock, actualErr := a.p.getLastProcessedBlock(a.ctx) +// require.Equal(t, a.expectedLastProcessedBlock, actualLastProcessedBlock) +// require.Equal(t, a.expectedErr, actualErr) +// } -func (a *getLastProcessedBlockAction) desc() string { - return a.description -} +// // reorg -func (a *getLastProcessedBlockAction) execute(t *testing.T) { - actualLastProcessedBlock, actualErr := a.p.getLastProcessedBlock(a.ctx) - require.Equal(t, a.expectedLastProcessedBlock, actualLastProcessedBlock) - require.Equal(t, a.expectedErr, actualErr) -} +// type reorgAction struct { +// p *processor +// description string +// firstReorgedBlock uint64 +// expectedErr error +// } -// reorg +// func (a *reorgAction) method() string { +// return "reorg" +// } -type reorgAction struct { - p *processor - description string - firstReorgedBlock uint64 - expectedErr error -} +// func (a *reorgAction) desc() string { +// return a.description +// } -func (a *reorgAction) method() string { - return "reorg" -} +// func (a *reorgAction) execute(t *testing.T) { +// actualErr := a.p.reorg(a.firstReorgedBlock) +// require.Equal(t, a.expectedErr, actualErr) +// } -func (a *reorgAction) desc() string { - return a.description -} +// // storeL1InfoTreeUpdates -func (a *reorgAction) execute(t *testing.T) { - actualErr := a.p.reorg(a.firstReorgedBlock) - require.Equal(t, a.expectedErr, actualErr) -} +// type storeL1InfoTreeUpdatesAction struct { +// p *processor +// description string +// b block +// expectedErr error +// } -// storeL1InfoTreeUpdates +// func (a *storeL1InfoTreeUpdatesAction) method() string { +// return "storeL1InfoTreeUpdates" +// } -type storeL1InfoTreeUpdatesAction struct { - p *processor - description string - b block - expectedErr error -} +// func (a *storeL1InfoTreeUpdatesAction) desc() string { +// return a.description +// } -func (a *storeL1InfoTreeUpdatesAction) method() string { - return "storeL1InfoTreeUpdates" -} - -func (a *storeL1InfoTreeUpdatesAction) desc() string { - return a.description -} - -func (a *storeL1InfoTreeUpdatesAction) execute(t *testing.T) { - actualErr := a.p.processBlock(a.b) - require.Equal(t, a.expectedErr, actualErr) -} +// func (a *storeL1InfoTreeUpdatesAction) execute(t *testing.T) { +// actualErr := a.p.processBlock(a.b) +// require.Equal(t, a.expectedErr, actualErr) +// } diff --git a/localbridgesync/downloader.go b/localbridgesync/downloader.go index 9763f818..955655f1 100644 --- a/localbridgesync/downloader.go +++ b/localbridgesync/downloader.go @@ -1,14 +1,13 @@ package localbridgesync import ( - "context" + "fmt" "math/big" "time" "github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonzkevmbridge" "github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonzkevmbridgev2" - "github.com/0xPolygon/cdk/etherman" - "github.com/0xPolygon/cdk/log" + "github.com/0xPolygon/cdk/sync" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -33,159 +32,45 @@ type EthClienter interface { bind.ContractBackend } -type downloaderInterface interface { - waitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64) - getEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []block - getLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log - appendLog(b *block, l types.Log) - getBlockHeader(ctx context.Context, blockNum uint64) blockHeader +type Bridge struct { + LeafType uint8 + OriginNetwork uint32 + OriginAddress common.Address + DestinationNetwork uint32 + DestinationAddress common.Address + Amount *big.Int + Metadata []byte + DepositCount uint32 } -type downloader struct { - syncBlockChunkSize uint64 - downloaderInterface +type Claim struct { + GlobalIndex *big.Int + OriginNetwork uint32 + OriginAddress common.Address + DestinationAddress common.Address + Amount *big.Int } -func newDownloader( - bridgeAddr common.Address, - ethClient EthClienter, - syncBlockChunkSize uint64, - blockFinalityType etherman.BlockNumberFinality, -) (*downloader, error) { - bridgeContractV1, err := polygonzkevmbridge.NewPolygonzkevmbridge(bridgeAddr, ethClient) - if err != nil { - return nil, err - } - bridgeContractV2, err := polygonzkevmbridgev2.NewPolygonzkevmbridgev2(bridgeAddr, ethClient) +type BridgeEvent struct { + Bridge *Bridge + Claim *Claim +} + +func buildAppender(client EthClienter, bridge common.Address) (sync.LogAppenderMap, error) { + bridgeContractV1, err := polygonzkevmbridge.NewPolygonzkevmbridge(bridge, client) if err != nil { return nil, err } - finality, err := blockFinalityType.ToBlockNum() + bridgeContractV2, err := polygonzkevmbridgev2.NewPolygonzkevmbridgev2(bridge, client) if err != nil { return nil, err } - return &downloader{ - syncBlockChunkSize: syncBlockChunkSize, - downloaderInterface: &downloaderImplementation{ - bridgeAddr: bridgeAddr, - bridgeContractV1: bridgeContractV1, - bridgeContractV2: bridgeContractV2, - ethClient: ethClient, - blockFinality: finality, - }, - }, nil -} - -func (d *downloader) download(ctx context.Context, fromBlock uint64, downloadedCh chan block) { - lastBlock := d.waitForNewBlocks(ctx, 0) - for { - select { - case <-ctx.Done(): - log.Debug("closing channel") - close(downloadedCh) - return - default: - } - toBlock := fromBlock + d.syncBlockChunkSize - if toBlock > lastBlock { - toBlock = lastBlock - } - if fromBlock > toBlock { - log.Debug("waiting for new blocks, last block ", toBlock) - lastBlock = d.waitForNewBlocks(ctx, toBlock) - continue - } - log.Debugf("getting events from blocks %d to %d", fromBlock, toBlock) - blocks := d.getEventsByBlockRange(ctx, fromBlock, toBlock) - for _, b := range blocks { - log.Debugf("sending block %d to the driver (with events)", b.Num) - downloadedCh <- b - } - if len(blocks) == 0 || blocks[len(blocks)-1].Num < toBlock { - // Indicate the last downloaded block if there are not events on it - log.Debugf("sending block %d to the driver (without evvents)", toBlock) - downloadedCh <- block{ - blockHeader: d.getBlockHeader(ctx, toBlock), - } - } - fromBlock = toBlock + 1 - } -} - -type downloaderImplementation struct { - bridgeAddr common.Address - bridgeContractV1 *polygonzkevmbridge.Polygonzkevmbridge - bridgeContractV2 *polygonzkevmbridgev2.Polygonzkevmbridgev2 - ethClient EthClienter - blockFinality *big.Int -} - -func (d *downloaderImplementation) waitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64) { - attempts := 0 - for { - header, err := d.ethClient.HeaderByNumber(ctx, d.blockFinality) - if err != nil { - attempts++ - log.Error("error geting last block num from eth client: ", err) - retryHandler("waitForNewBlocks", attempts) - continue - } - if header.Number.Uint64() > lastBlockSeen { - return header.Number.Uint64() - } - time.Sleep(waitForNewBlocksPeriod) - } -} - -func (d *downloaderImplementation) getEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []block { - blocks := []block{} - logs := d.getLogs(ctx, fromBlock, toBlock) - for _, l := range logs { - if len(blocks) == 0 || blocks[len(blocks)-1].Num < l.BlockNumber { - blocks = append(blocks, block{ - blockHeader: blockHeader{ - Num: l.BlockNumber, - Hash: l.BlockHash, - }, - Events: []BridgeEvent{}, - }) - } - d.appendLog(&blocks[len(blocks)-1], l) - } - - return blocks -} + appender := make(sync.LogAppenderMap) -func (d *downloaderImplementation) getLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log { - query := ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(fromBlock), - Addresses: []common.Address{d.bridgeAddr}, - Topics: [][]common.Hash{ - {bridgeEventSignature}, - {claimEventSignature}, - {claimEventSignaturePreEtrog}, - }, - ToBlock: new(big.Int).SetUint64(toBlock), - } - attempts := 0 - for { - logs, err := d.ethClient.FilterLogs(ctx, query) + appender[bridgeEventSignature] = func(b *sync.EVMBlock, l types.Log) error { + bridge, err := bridgeContractV2.ParseBridgeEvent(l) if err != nil { - attempts++ - log.Error("error calling FilterLogs to eth client: ", err) - retryHandler("getLogs", attempts) - continue - } - return logs - } -} - -func (d *downloaderImplementation) appendLog(b *block, l types.Log) { - switch l.Topics[0] { - case bridgeEventSignature: - bridge, err := d.bridgeContractV2.ParseBridgeEvent(l) - if err != nil { - log.Fatalf( + return fmt.Errorf( "error parsing log %+v using d.bridgeContractV2.ParseBridgeEvent: %v", l, err, ) @@ -200,10 +85,13 @@ func (d *downloaderImplementation) appendLog(b *block, l types.Log) { Metadata: bridge.Metadata, DepositCount: bridge.DepositCount, }}) - case claimEventSignature: - claim, err := d.bridgeContractV2.ParseClaimEvent(l) + return nil + } + + appender[claimEventSignature] = func(b *sync.EVMBlock, l types.Log) error { + claim, err := bridgeContractV2.ParseClaimEvent(l) if err != nil { - log.Fatalf( + return fmt.Errorf( "error parsing log %+v using d.bridgeContractV2.ParseClaimEvent: %v", l, err, ) @@ -215,10 +103,13 @@ func (d *downloaderImplementation) appendLog(b *block, l types.Log) { DestinationAddress: claim.DestinationAddress, Amount: claim.Amount, }}) - case claimEventSignaturePreEtrog: - claim, err := d.bridgeContractV1.ParseClaimEvent(l) + return nil + } + + appender[claimEventSignature] = func(b *sync.EVMBlock, l types.Log) error { + claim, err := bridgeContractV1.ParseClaimEvent(l) if err != nil { - log.Fatalf( + return fmt.Errorf( "error parsing log %+v using d.bridgeContractV1.ParseClaimEvent: %v", l, err, ) @@ -230,24 +121,8 @@ func (d *downloaderImplementation) appendLog(b *block, l types.Log) { DestinationAddress: claim.DestinationAddress, Amount: claim.Amount, }}) - default: - log.Fatalf("unexpected log %+v", l) + return nil } -} -func (d *downloaderImplementation) getBlockHeader(ctx context.Context, blockNum uint64) blockHeader { - attempts := 0 - for { - header, err := d.ethClient.HeaderByNumber(ctx, big.NewInt(int64(blockNum))) - if err != nil { - attempts++ - log.Errorf("error getting block header for block %d, err: %v", blockNum, err) - retryHandler("getBlockHeader", attempts) - continue - } - return blockHeader{ - Num: header.Number.Uint64(), - Hash: header.Hash(), - } - } + return appender, nil } diff --git a/localbridgesync/downloader_test.go b/localbridgesync/downloader_test.go deleted file mode 100644 index 553efbec..00000000 --- a/localbridgesync/downloader_test.go +++ /dev/null @@ -1,467 +0,0 @@ -package localbridgesync - -import ( - "context" - "errors" - "math/big" - "testing" - "time" - - "github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonzkevmbridge" - "github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonzkevmbridgev2" - cdkcommon "github.com/0xPolygon/cdk/common" - "github.com/0xPolygon/cdk/etherman" - "github.com/0xPolygon/cdk/log" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/abi" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" -) - -var ( - contractAddr = common.HexToAddress("1234567890") -) - -const ( - syncBlockChunck = uint64(10) -) - -func TestGetEventsByBlockRange(t *testing.T) { - type testCase struct { - description string - inputLogs []types.Log - fromBlock, toBlock uint64 - expectedBlocks []block - } - testCases := []testCase{} - clientMock := NewL2Mock(t) - ctx := context.Background() - d, err := newDownloader(contractAddr, clientMock, syncBlockChunck, etherman.LatestBlock) - require.NoError(t, err) - - // case 0: single block, no events - case0 := testCase{ - description: "case 0: single block, no events", - inputLogs: []types.Log{}, - fromBlock: 1, - toBlock: 3, - expectedBlocks: []block{}, - } - testCases = append(testCases, case0) - - // case 1: single block, single event - logC1, bridgeC1 := generateBridge(t, 3) - logsC1 := []types.Log{ - *logC1, - } - blocksC1 := []block{ - { - blockHeader: blockHeader{ - Num: logC1.BlockNumber, - Hash: logC1.BlockHash, - }, - Events: []BridgeEvent{ - { - Bridge: &bridgeC1, - }, - }, - }, - } - case1 := testCase{ - description: "case 1: single block, single event", - inputLogs: logsC1, - fromBlock: 3, - toBlock: 3, - expectedBlocks: blocksC1, - } - testCases = append(testCases, case1) - - // case 2: single block, multiple events - logC2_1, bridgeC2_1 := generateBridge(t, 5) - logC2_2, bridgeC2_2 := generateBridge(t, 5) - logC2_3, claimC2_1 := generateClaimV1(t, 5) - logC2_4, claimC2_2 := generateClaimV2(t, 5) - logsC2 := []types.Log{ - *logC2_1, - *logC2_2, - *logC2_3, - *logC2_4, - } - blocksC2 := []block{ - { - blockHeader: blockHeader{ - Num: logC2_1.BlockNumber, - Hash: logC2_1.BlockHash, - }, - Events: []BridgeEvent{ - {Bridge: &bridgeC2_1}, - {Bridge: &bridgeC2_2}, - {Claim: &claimC2_1}, - {Claim: &claimC2_2}, - }, - }, - } - case2 := testCase{ - description: "case 2: single block, multiple events", - inputLogs: logsC2, - fromBlock: 5, - toBlock: 5, - expectedBlocks: blocksC2, - } - testCases = append(testCases, case2) - - // case 3: multiple blocks, some events - logC3_1, bridgeC3_1 := generateBridge(t, 7) - logC3_2, bridgeC3_2 := generateBridge(t, 7) - logC3_3, claimC3_1 := generateClaimV1(t, 8) - logC3_4, claimC3_2 := generateClaimV2(t, 8) - logsC3 := []types.Log{ - *logC3_1, - *logC3_2, - *logC3_3, - *logC3_4, - } - blocksC3 := []block{ - { - blockHeader: blockHeader{ - Num: logC3_1.BlockNumber, - Hash: logC3_1.BlockHash, - }, - Events: []BridgeEvent{ - {Bridge: &bridgeC3_1}, - {Bridge: &bridgeC3_2}, - }, - }, - { - blockHeader: blockHeader{ - Num: logC3_3.BlockNumber, - Hash: logC3_3.BlockHash, - }, - Events: []BridgeEvent{ - {Claim: &claimC3_1}, - {Claim: &claimC3_2}, - }, - }, - } - case3 := testCase{ - description: "case 3: multiple blocks, some events", - inputLogs: logsC3, - fromBlock: 7, - toBlock: 8, - expectedBlocks: blocksC3, - } - testCases = append(testCases, case3) - - for _, tc := range testCases { - query := ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(tc.fromBlock), - Addresses: []common.Address{contractAddr}, - Topics: [][]common.Hash{ - {bridgeEventSignature}, - {claimEventSignature}, - {claimEventSignaturePreEtrog}, - }, - ToBlock: new(big.Int).SetUint64(tc.toBlock), - } - clientMock. - On("FilterLogs", mock.Anything, query). - Return(tc.inputLogs, nil) - - actualBlocks := d.getEventsByBlockRange(ctx, tc.fromBlock, tc.toBlock) - require.Equal(t, tc.expectedBlocks, actualBlocks, tc.description) - } -} - -func generateBridge(t *testing.T, blockNum uint32) (*types.Log, Bridge) { - b := Bridge{ - LeafType: 1, - OriginNetwork: blockNum, - OriginAddress: contractAddr, - DestinationNetwork: blockNum, - DestinationAddress: contractAddr, - Amount: big.NewInt(int64(blockNum)), - Metadata: common.Hex2Bytes("01"), - DepositCount: blockNum, - } - abi, err := polygonzkevmbridgev2.Polygonzkevmbridgev2MetaData.GetAbi() - require.NoError(t, err) - event, err := abi.EventByID(bridgeEventSignature) - require.NoError(t, err) - data, err := event.Inputs.Pack( - b.LeafType, - b.OriginNetwork, - b.OriginAddress, - b.DestinationNetwork, - b.DestinationAddress, - b.Amount, - b.Metadata, - b.DepositCount, - ) - require.NoError(t, err) - log := &types.Log{ - Address: contractAddr, - BlockNumber: uint64(blockNum), - BlockHash: common.BytesToHash(cdkcommon.BlockNum2Bytes(uint64(blockNum))), - Topics: []common.Hash{bridgeEventSignature}, - Data: data, - } - return log, b -} - -func generateClaimV1(t *testing.T, blockNum uint32) (*types.Log, Claim) { - abi, err := polygonzkevmbridge.PolygonzkevmbridgeMetaData.GetAbi() - require.NoError(t, err) - event, err := abi.EventByID(claimEventSignaturePreEtrog) - require.NoError(t, err) - return generateClaim(t, blockNum, event, true) -} - -func generateClaimV2(t *testing.T, blockNum uint32) (*types.Log, Claim) { - abi, err := polygonzkevmbridgev2.Polygonzkevmbridgev2MetaData.GetAbi() - require.NoError(t, err) - event, err := abi.EventByID(claimEventSignature) - require.NoError(t, err) - return generateClaim(t, blockNum, event, false) -} - -func generateClaim(t *testing.T, blockNum uint32, event *abi.Event, isV1 bool) (*types.Log, Claim) { - c := Claim{ - GlobalIndex: big.NewInt(int64(blockNum)), - OriginNetwork: blockNum, - OriginAddress: contractAddr, - DestinationAddress: contractAddr, - Amount: big.NewInt(int64(blockNum)), - } - var ( - data []byte - err error - signature common.Hash - ) - if isV1 { - data, err = event.Inputs.Pack( - uint32(c.GlobalIndex.Uint64()), - c.OriginNetwork, - c.OriginAddress, - c.DestinationAddress, - c.Amount, - ) - signature = claimEventSignaturePreEtrog - } else { - data, err = event.Inputs.Pack( - c.GlobalIndex, - c.OriginNetwork, - c.OriginAddress, - c.DestinationAddress, - c.Amount, - ) - signature = claimEventSignature - } - require.NoError(t, err) - log := &types.Log{ - Address: contractAddr, - BlockNumber: uint64(blockNum), - BlockHash: common.BytesToHash(cdkcommon.BlockNum2Bytes(uint64(blockNum))), - Topics: []common.Hash{signature}, - Data: data, - } - return log, c -} - -func TestDownload(t *testing.T) { - /* - NOTE: due to the concurrent nature of this test (the function being tested runs through a goroutine) - if the mock doesn't match, the goroutine will get stuck and the test will timeout - */ - d := NewDownloaderMock(t) - downloadCh := make(chan block, 1) - ctx := context.Background() - ctx1, cancel := context.WithCancel(ctx) - expectedBlocks := []block{} - clientMock := NewL2Mock(t) - dwnldr, err := newDownloader(contractAddr, clientMock, syncBlockChunck, etherman.LatestBlock) - require.NoError(t, err) - dwnldr.downloaderInterface = d - - d.On("waitForNewBlocks", mock.Anything, uint64(0)). - Return(uint64(1)) - // iteratiion 0: - // last block is 1, download that block (no events and wait) - b1 := block{ - blockHeader: blockHeader{ - Num: 1, - Hash: common.HexToHash("01"), - }, - } - expectedBlocks = append(expectedBlocks, b1) - d.On("getEventsByBlockRange", mock.Anything, uint64(0), uint64(1)). - Return([]block{}) - d.On("getBlockHeader", mock.Anything, uint64(1)). - Return(b1.blockHeader) - - // iteration 1: wait for next block to be created - d.On("waitForNewBlocks", mock.Anything, uint64(1)). - After(time.Millisecond * 100). - Return(uint64(2)).Once() - - // iteration 2: block 2 has events - b2 := block{ - blockHeader: blockHeader{ - Num: 2, - Hash: common.HexToHash("02"), - }, - } - expectedBlocks = append(expectedBlocks, b2) - d.On("getEventsByBlockRange", mock.Anything, uint64(2), uint64(2)). - Return([]block{b2}) - - // iteration 3: wait for next block to be created (jump to block 8) - d.On("waitForNewBlocks", mock.Anything, uint64(2)). - After(time.Millisecond * 100). - Return(uint64(8)).Once() - - // iteration 4: blocks 6 and 7 have events - b6 := block{ - blockHeader: blockHeader{ - Num: 6, - Hash: common.HexToHash("06"), - }, - Events: []BridgeEvent{ - {Claim: &Claim{OriginNetwork: 6}}, - }, - } - b7 := block{ - blockHeader: blockHeader{ - Num: 7, - Hash: common.HexToHash("07"), - }, - Events: []BridgeEvent{ - {Bridge: &Bridge{DestinationNetwork: 7}}, - }, - } - b8 := block{ - blockHeader: blockHeader{ - Num: 8, - Hash: common.HexToHash("08"), - }, - } - expectedBlocks = append(expectedBlocks, b6, b7, b8) - d.On("getEventsByBlockRange", mock.Anything, uint64(3), uint64(8)). - Return([]block{b6, b7}) - d.On("getBlockHeader", mock.Anything, uint64(8)). - Return(b8.blockHeader) - - // iteration 5: wait for next block to be created (jump to block 30) - d.On("waitForNewBlocks", mock.Anything, uint64(8)). - After(time.Millisecond * 100). - Return(uint64(30)).Once() - - // iteration 6: from block 9 to 19, no events - b19 := block{ - blockHeader: blockHeader{ - Num: 19, - Hash: common.HexToHash("19"), - }, - } - expectedBlocks = append(expectedBlocks, b19) - d.On("getEventsByBlockRange", mock.Anything, uint64(9), uint64(19)). - Return([]block{}) - d.On("getBlockHeader", mock.Anything, uint64(19)). - Return(b19.blockHeader) - - // iteration 7: from block 20 to 30, events on last block - b30 := block{ - blockHeader: blockHeader{ - Num: 30, - Hash: common.HexToHash("30"), - }, - Events: []BridgeEvent{ - {Bridge: &Bridge{DestinationNetwork: 30}}, - }, - } - expectedBlocks = append(expectedBlocks, b30) - d.On("getEventsByBlockRange", mock.Anything, uint64(20), uint64(30)). - Return([]block{b30}) - - // iteration 8: wait for next block to be created (jump to block 35) - d.On("waitForNewBlocks", mock.Anything, uint64(30)). - After(time.Millisecond * 100). - Return(uint64(35)).Once() - - go dwnldr.download(ctx1, 0, downloadCh) - for _, expectedBlock := range expectedBlocks { - actualBlock := <-downloadCh - log.Debugf("block %d received!", actualBlock.Num) - require.Equal(t, expectedBlock, actualBlock) - } - log.Debug("canceling") - cancel() - _, ok := <-downloadCh - require.False(t, ok) -} - -func TestWaitForNewBlocks(t *testing.T) { - retryAfterErrorPeriod = time.Millisecond * 100 - clientMock := NewL2Mock(t) - ctx := context.Background() - d, err := newDownloader(contractAddr, clientMock, syncBlockChunck, etherman.LatestBlock) - require.NoError(t, err) - - // at first attempt - currentBlock := uint64(5) - expectedBlock := uint64(6) - clientMock.On("HeaderByNumber", ctx, mock.Anything).Return(&types.Header{ - Number: big.NewInt(6), - }, nil).Once() - actualBlock := d.waitForNewBlocks(ctx, currentBlock) - assert.Equal(t, expectedBlock, actualBlock) - - // 2 iterations - clientMock.On("HeaderByNumber", ctx, mock.Anything).Return(&types.Header{ - Number: big.NewInt(5), - }, nil).Once() - clientMock.On("HeaderByNumber", ctx, mock.Anything).Return(&types.Header{ - Number: big.NewInt(6), - }, nil).Once() - actualBlock = d.waitForNewBlocks(ctx, currentBlock) - assert.Equal(t, expectedBlock, actualBlock) - - // after error from client - clientMock.On("HeaderByNumber", ctx, mock.Anything).Return(nil, errors.New("foo")).Once() - clientMock.On("HeaderByNumber", ctx, mock.Anything).Return(&types.Header{ - Number: big.NewInt(6), - }, nil).Once() - actualBlock = d.waitForNewBlocks(ctx, currentBlock) - assert.Equal(t, expectedBlock, actualBlock) -} - -func TestGetBlockHeader(t *testing.T) { - retryAfterErrorPeriod = time.Millisecond * 100 - clientMock := NewL2Mock(t) - ctx := context.Background() - d, err := newDownloader(contractAddr, clientMock, syncBlockChunck, etherman.LatestBlock) - require.NoError(t, err) - - blockNum := uint64(5) - blockNumBig := big.NewInt(5) - returnedBlock := &types.Header{ - Number: blockNumBig, - } - expectedBlock := blockHeader{ - Num: 5, - Hash: returnedBlock.Hash(), - } - - // at first attempt - clientMock.On("HeaderByNumber", ctx, blockNumBig).Return(returnedBlock, nil).Once() - actualBlock := d.getBlockHeader(ctx, blockNum) - assert.Equal(t, expectedBlock, actualBlock) - - // after error from client - clientMock.On("HeaderByNumber", ctx, blockNumBig).Return(nil, errors.New("foo")).Once() - clientMock.On("HeaderByNumber", ctx, blockNumBig).Return(returnedBlock, nil).Once() - actualBlock = d.getBlockHeader(ctx, blockNum) - assert.Equal(t, expectedBlock, actualBlock) -} diff --git a/localbridgesync/localbridgesync.go b/localbridgesync/localbridgesync.go index 42ab67d1..6707bbad 100644 --- a/localbridgesync/localbridgesync.go +++ b/localbridgesync/localbridgesync.go @@ -1,21 +1,27 @@ package localbridgesync import ( + "context" "time" "github.com/0xPolygon/cdk/etherman" - "github.com/0xPolygon/cdk/log" + "github.com/0xPolygon/cdk/sync" "github.com/ethereum/go-ethereum/common" ) +const ( + reorgDetectorID = "localbridgesync" + downloadBufferSize = 1000 +) + var ( retryAfterErrorPeriod = time.Second * 10 maxRetryAttemptsAfterError = 5 ) type LocalBridgeSync struct { - *processor - *driver + processor *processor + driver *sync.EVMDriver } func New( @@ -23,30 +29,40 @@ func New( bridge common.Address, syncBlockChunkSize uint64, blockFinalityType etherman.BlockNumberFinality, - rd ReorgDetector, + rd sync.ReorgDetector, l2Client EthClienter, ) (*LocalBridgeSync, error) { - p, err := newProcessor(dbPath) + processor, err := newProcessor(dbPath) if err != nil { return nil, err } - dwn, err := newDownloader(bridge, l2Client, syncBlockChunkSize, blockFinalityType) + + appender, err := buildAppender(l2Client, bridge) if err != nil { return nil, err } - dri, err := newDriver(rd, p, dwn) + downloader, err := sync.NewEVMDownloader( + l2Client, + syncBlockChunkSize, + blockFinalityType, + waitForNewBlocksPeriod, + appender, + []common.Address{bridge}, + ) if err != nil { return nil, err } - return &LocalBridgeSync{p, dri}, nil -} -func retryHandler(funcName string, attempts int) { - if attempts >= maxRetryAttemptsAfterError { - log.Fatalf( - "%s failed too many times (%d)", - funcName, maxRetryAttemptsAfterError, - ) + driver, err := sync.NewEVMDriver(rd, processor, downloader, reorgDetectorID, downloadBufferSize) + if err != nil { + return nil, err } - time.Sleep(retryAfterErrorPeriod) + return &LocalBridgeSync{ + processor: processor, + driver: driver, + }, nil +} + +func (s *LocalBridgeSync) Start(ctx context.Context) { + s.driver.Sync(ctx) } diff --git a/localbridgesync/mock_downloader_test.go b/localbridgesync/mock_downloader_test.go deleted file mode 100644 index f2df97d0..00000000 --- a/localbridgesync/mock_downloader_test.go +++ /dev/null @@ -1,100 +0,0 @@ -// Code generated by mockery v2.22.1. DO NOT EDIT. - -package localbridgesync - -import ( - context "context" - - types "github.com/ethereum/go-ethereum/core/types" - mock "github.com/stretchr/testify/mock" -) - -// DownloaderMock is an autogenerated mock type for the downloaderFull type -type DownloaderMock struct { - mock.Mock -} - -// appendLog provides a mock function with given fields: b, l -func (_m *DownloaderMock) appendLog(b *block, l types.Log) { - _m.Called(b, l) -} - -// download provides a mock function with given fields: ctx, fromBlock, downloadedCh -func (_m *DownloaderMock) download(ctx context.Context, fromBlock uint64, downloadedCh chan block) { - _m.Called(ctx, fromBlock, downloadedCh) -} - -// getBlockHeader provides a mock function with given fields: ctx, blockNum -func (_m *DownloaderMock) getBlockHeader(ctx context.Context, blockNum uint64) blockHeader { - ret := _m.Called(ctx, blockNum) - - var r0 blockHeader - if rf, ok := ret.Get(0).(func(context.Context, uint64) blockHeader); ok { - r0 = rf(ctx, blockNum) - } else { - r0 = ret.Get(0).(blockHeader) - } - - return r0 -} - -// getEventsByBlockRange provides a mock function with given fields: ctx, fromBlock, toBlock -func (_m *DownloaderMock) getEventsByBlockRange(ctx context.Context, fromBlock uint64, toBlock uint64) []block { - ret := _m.Called(ctx, fromBlock, toBlock) - - var r0 []block - if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64) []block); ok { - r0 = rf(ctx, fromBlock, toBlock) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]block) - } - } - - return r0 -} - -// getLogs provides a mock function with given fields: ctx, fromBlock, toBlock -func (_m *DownloaderMock) getLogs(ctx context.Context, fromBlock uint64, toBlock uint64) []types.Log { - ret := _m.Called(ctx, fromBlock, toBlock) - - var r0 []types.Log - if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64) []types.Log); ok { - r0 = rf(ctx, fromBlock, toBlock) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]types.Log) - } - } - - return r0 -} - -// waitForNewBlocks provides a mock function with given fields: ctx, lastBlockSeen -func (_m *DownloaderMock) waitForNewBlocks(ctx context.Context, lastBlockSeen uint64) uint64 { - ret := _m.Called(ctx, lastBlockSeen) - - var r0 uint64 - if rf, ok := ret.Get(0).(func(context.Context, uint64) uint64); ok { - r0 = rf(ctx, lastBlockSeen) - } else { - r0 = ret.Get(0).(uint64) - } - - return r0 -} - -type mockConstructorTestingTNewDownloaderMock interface { - mock.TestingT - Cleanup(func()) -} - -// NewDownloaderMock creates a new instance of DownloaderMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewDownloaderMock(t mockConstructorTestingTNewDownloaderMock) *DownloaderMock { - mock := &DownloaderMock{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/localbridgesync/mock_l2_test.go b/localbridgesync/mock_l2_test.go deleted file mode 100644 index 78baa614..00000000 --- a/localbridgesync/mock_l2_test.go +++ /dev/null @@ -1,484 +0,0 @@ -// Code generated by mockery v2.22.1. DO NOT EDIT. - -package localbridgesync - -import ( - context "context" - big "math/big" - - common "github.com/ethereum/go-ethereum/common" - - ethereum "github.com/ethereum/go-ethereum" - - mock "github.com/stretchr/testify/mock" - - types "github.com/ethereum/go-ethereum/core/types" -) - -// L2Mock is an autogenerated mock type for the EthClienter type -type L2Mock struct { - mock.Mock -} - -// BlockByHash provides a mock function with given fields: ctx, hash -func (_m *L2Mock) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { - ret := _m.Called(ctx, hash) - - var r0 *types.Block - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.Block, error)); ok { - return rf(ctx, hash) - } - if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *types.Block); ok { - r0 = rf(ctx, hash) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.Block) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { - r1 = rf(ctx, hash) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// BlockByNumber provides a mock function with given fields: ctx, number -func (_m *L2Mock) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { - ret := _m.Called(ctx, number) - - var r0 *types.Block - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Block, error)); ok { - return rf(ctx, number) - } - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *types.Block); ok { - r0 = rf(ctx, number) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.Block) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { - r1 = rf(ctx, number) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// BlockNumber provides a mock function with given fields: ctx -func (_m *L2Mock) BlockNumber(ctx context.Context) (uint64, error) { - ret := _m.Called(ctx) - - var r0 uint64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) uint64); ok { - r0 = rf(ctx) - } else { - r0 = ret.Get(0).(uint64) - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// CallContract provides a mock function with given fields: ctx, call, blockNumber -func (_m *L2Mock) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { - ret := _m.Called(ctx, call, blockNumber) - - var r0 []byte - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, ethereum.CallMsg, *big.Int) ([]byte, error)); ok { - return rf(ctx, call, blockNumber) - } - if rf, ok := ret.Get(0).(func(context.Context, ethereum.CallMsg, *big.Int) []byte); ok { - r0 = rf(ctx, call, blockNumber) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]byte) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, ethereum.CallMsg, *big.Int) error); ok { - r1 = rf(ctx, call, blockNumber) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// CodeAt provides a mock function with given fields: ctx, contract, blockNumber -func (_m *L2Mock) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) { - ret := _m.Called(ctx, contract, blockNumber) - - var r0 []byte - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) ([]byte, error)); ok { - return rf(ctx, contract, blockNumber) - } - if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) []byte); ok { - r0 = rf(ctx, contract, blockNumber) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]byte) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, common.Address, *big.Int) error); ok { - r1 = rf(ctx, contract, blockNumber) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// EstimateGas provides a mock function with given fields: ctx, call -func (_m *L2Mock) EstimateGas(ctx context.Context, call ethereum.CallMsg) (uint64, error) { - ret := _m.Called(ctx, call) - - var r0 uint64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, ethereum.CallMsg) (uint64, error)); ok { - return rf(ctx, call) - } - if rf, ok := ret.Get(0).(func(context.Context, ethereum.CallMsg) uint64); ok { - r0 = rf(ctx, call) - } else { - r0 = ret.Get(0).(uint64) - } - - if rf, ok := ret.Get(1).(func(context.Context, ethereum.CallMsg) error); ok { - r1 = rf(ctx, call) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// FilterLogs provides a mock function with given fields: ctx, q -func (_m *L2Mock) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { - ret := _m.Called(ctx, q) - - var r0 []types.Log - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, ethereum.FilterQuery) ([]types.Log, error)); ok { - return rf(ctx, q) - } - if rf, ok := ret.Get(0).(func(context.Context, ethereum.FilterQuery) []types.Log); ok { - r0 = rf(ctx, q) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]types.Log) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, ethereum.FilterQuery) error); ok { - r1 = rf(ctx, q) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// HeaderByHash provides a mock function with given fields: ctx, hash -func (_m *L2Mock) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { - ret := _m.Called(ctx, hash) - - var r0 *types.Header - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.Header, error)); ok { - return rf(ctx, hash) - } - if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *types.Header); ok { - r0 = rf(ctx, hash) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.Header) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { - r1 = rf(ctx, hash) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// HeaderByNumber provides a mock function with given fields: ctx, number -func (_m *L2Mock) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { - ret := _m.Called(ctx, number) - - var r0 *types.Header - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Header, error)); ok { - return rf(ctx, number) - } - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) *types.Header); ok { - r0 = rf(ctx, number) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.Header) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { - r1 = rf(ctx, number) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// PendingCodeAt provides a mock function with given fields: ctx, account -func (_m *L2Mock) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) { - ret := _m.Called(ctx, account) - - var r0 []byte - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, common.Address) ([]byte, error)); ok { - return rf(ctx, account) - } - if rf, ok := ret.Get(0).(func(context.Context, common.Address) []byte); ok { - r0 = rf(ctx, account) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]byte) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, common.Address) error); ok { - r1 = rf(ctx, account) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// PendingNonceAt provides a mock function with given fields: ctx, account -func (_m *L2Mock) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { - ret := _m.Called(ctx, account) - - var r0 uint64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, common.Address) (uint64, error)); ok { - return rf(ctx, account) - } - if rf, ok := ret.Get(0).(func(context.Context, common.Address) uint64); ok { - r0 = rf(ctx, account) - } else { - r0 = ret.Get(0).(uint64) - } - - if rf, ok := ret.Get(1).(func(context.Context, common.Address) error); ok { - r1 = rf(ctx, account) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// SendTransaction provides a mock function with given fields: ctx, tx -func (_m *L2Mock) SendTransaction(ctx context.Context, tx *types.Transaction) error { - ret := _m.Called(ctx, tx) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *types.Transaction) error); ok { - r0 = rf(ctx, tx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// SubscribeFilterLogs provides a mock function with given fields: ctx, q, ch -func (_m *L2Mock) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { - ret := _m.Called(ctx, q, ch) - - var r0 ethereum.Subscription - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error)); ok { - return rf(ctx, q, ch) - } - if rf, ok := ret.Get(0).(func(context.Context, ethereum.FilterQuery, chan<- types.Log) ethereum.Subscription); ok { - r0 = rf(ctx, q, ch) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(ethereum.Subscription) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, ethereum.FilterQuery, chan<- types.Log) error); ok { - r1 = rf(ctx, q, ch) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// SubscribeNewHead provides a mock function with given fields: ctx, ch -func (_m *L2Mock) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { - ret := _m.Called(ctx, ch) - - var r0 ethereum.Subscription - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, chan<- *types.Header) (ethereum.Subscription, error)); ok { - return rf(ctx, ch) - } - if rf, ok := ret.Get(0).(func(context.Context, chan<- *types.Header) ethereum.Subscription); ok { - r0 = rf(ctx, ch) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(ethereum.Subscription) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, chan<- *types.Header) error); ok { - r1 = rf(ctx, ch) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// SuggestGasPrice provides a mock function with given fields: ctx -func (_m *L2Mock) SuggestGasPrice(ctx context.Context) (*big.Int, error) { - ret := _m.Called(ctx) - - var r0 *big.Int - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (*big.Int, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) *big.Int); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*big.Int) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// SuggestGasTipCap provides a mock function with given fields: ctx -func (_m *L2Mock) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { - ret := _m.Called(ctx) - - var r0 *big.Int - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (*big.Int, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) *big.Int); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*big.Int) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// TransactionCount provides a mock function with given fields: ctx, blockHash -func (_m *L2Mock) TransactionCount(ctx context.Context, blockHash common.Hash) (uint, error) { - ret := _m.Called(ctx, blockHash) - - var r0 uint - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (uint, error)); ok { - return rf(ctx, blockHash) - } - if rf, ok := ret.Get(0).(func(context.Context, common.Hash) uint); ok { - r0 = rf(ctx, blockHash) - } else { - r0 = ret.Get(0).(uint) - } - - if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { - r1 = rf(ctx, blockHash) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// TransactionInBlock provides a mock function with given fields: ctx, blockHash, index -func (_m *L2Mock) TransactionInBlock(ctx context.Context, blockHash common.Hash, index uint) (*types.Transaction, error) { - ret := _m.Called(ctx, blockHash, index) - - var r0 *types.Transaction - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, common.Hash, uint) (*types.Transaction, error)); ok { - return rf(ctx, blockHash, index) - } - if rf, ok := ret.Get(0).(func(context.Context, common.Hash, uint) *types.Transaction); ok { - r0 = rf(ctx, blockHash, index) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.Transaction) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, common.Hash, uint) error); ok { - r1 = rf(ctx, blockHash, index) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -type mockConstructorTestingTNewL2Mock interface { - mock.TestingT - Cleanup(func()) -} - -// NewL2Mock creates a new instance of L2Mock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewL2Mock(t mockConstructorTestingTNewL2Mock) *L2Mock { - mock := &L2Mock{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/localbridgesync/mock_processor_test.go b/localbridgesync/mock_processor_test.go deleted file mode 100644 index 4a629f5c..00000000 --- a/localbridgesync/mock_processor_test.go +++ /dev/null @@ -1,81 +0,0 @@ -// Code generated by mockery v2.22.1. DO NOT EDIT. - -package localbridgesync - -import ( - context "context" - - mock "github.com/stretchr/testify/mock" -) - -// ProcessorMock is an autogenerated mock type for the processorInterface type -type ProcessorMock struct { - mock.Mock -} - -// getLastProcessedBlock provides a mock function with given fields: ctx -func (_m *ProcessorMock) getLastProcessedBlock(ctx context.Context) (uint64, error) { - ret := _m.Called(ctx) - - var r0 uint64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) uint64); ok { - r0 = rf(ctx) - } else { - r0 = ret.Get(0).(uint64) - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// reorg provides a mock function with given fields: firstReorgedBlock -func (_m *ProcessorMock) reorg(firstReorgedBlock uint64) error { - ret := _m.Called(firstReorgedBlock) - - var r0 error - if rf, ok := ret.Get(0).(func(uint64) error); ok { - r0 = rf(firstReorgedBlock) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// storeBridgeEvents provides a mock function with given fields: blockNum, events -func (_m *ProcessorMock) storeBridgeEvents(blockNum uint64, events []BridgeEvent) error { - ret := _m.Called(blockNum, events) - - var r0 error - if rf, ok := ret.Get(0).(func(uint64, []BridgeEvent) error); ok { - r0 = rf(blockNum, events) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -type mockConstructorTestingTNewProcessorMock interface { - mock.TestingT - Cleanup(func()) -} - -// NewProcessorMock creates a new instance of ProcessorMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewProcessorMock(t mockConstructorTestingTNewProcessorMock) *ProcessorMock { - mock := &ProcessorMock{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/localbridgesync/processor.go b/localbridgesync/processor.go index 69c885b8..443a7925 100644 --- a/localbridgesync/processor.go +++ b/localbridgesync/processor.go @@ -6,6 +6,7 @@ import ( "errors" "github.com/0xPolygon/cdk/common" + "github.com/0xPolygon/cdk/sync" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" ) @@ -87,7 +88,7 @@ func (p *processor) GetClaimsAndBridges( return events, nil } -func (p *processor) getLastProcessedBlock(ctx context.Context) (uint64, error) { +func (p *processor) GetLastProcessedBlock(ctx context.Context) (uint64, error) { tx, err := p.db.BeginRo(ctx) if err != nil { return 0, err @@ -106,7 +107,7 @@ func (p *processor) getLastProcessedBlockWithTx(tx kv.Tx) (uint64, error) { } } -func (p *processor) reorg(firstReorgedBlock uint64) error { +func (p *processor) Reorg(firstReorgedBlock uint64) error { tx, err := p.db.BeginRw(context.Background()) if err != nil { return err @@ -134,23 +135,27 @@ func (p *processor) reorg(firstReorgedBlock uint64) error { return tx.Commit() } -func (p *processor) storeBridgeEvents(blockNum uint64, events []BridgeEvent) error { +func (p *processor) ProcessBlock(block sync.EVMBlock) error { tx, err := p.db.BeginRw(context.Background()) if err != nil { return err } - if len(events) > 0 { + if len(block.Events) > 0 { + events := []BridgeEvent{} + for _, e := range block.Events { + events = append(events, e.(BridgeEvent)) + } value, err := json.Marshal(events) if err != nil { tx.Rollback() return err } - if err := tx.Put(eventsTable, common.BlockNum2Bytes(blockNum), value); err != nil { + if err := tx.Put(eventsTable, common.BlockNum2Bytes(block.Num), value); err != nil { tx.Rollback() return err } } - if err := p.updateLastProcessedBlock(tx, blockNum); err != nil { + if err := p.updateLastProcessedBlock(tx, block.Num); err != nil { tx.Rollback() return err } diff --git a/localbridgesync/processor_test.go b/localbridgesync/processor_test.go index 8e6884c2..a7835256 100644 --- a/localbridgesync/processor_test.go +++ b/localbridgesync/processor_test.go @@ -7,6 +7,7 @@ import ( "slices" "testing" + "github.com/0xPolygon/cdk/sync" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" ) @@ -45,11 +46,10 @@ func TestProceessor(t *testing.T) { expectedEvents: nil, expectedErr: ErrBlockNotProcessed, }, - &storeBridgeEventsAction{ + &processBlockAction{ p: p, description: "block1", - blockNum: block1.Num, - events: block1.Events, + block: block1, expectedErr: nil, }, // processed: block1 @@ -75,7 +75,7 @@ func TestProceessor(t *testing.T) { ctx: context.Background(), fromBlock: 1, toBlock: 1, - expectedEvents: block1.Events, + expectedEvents: eventsToBridgeEvents(block1.Events), expectedErr: nil, }, &reorgAction{ @@ -94,19 +94,17 @@ func TestProceessor(t *testing.T) { expectedEvents: nil, expectedErr: ErrBlockNotProcessed, }, - &storeBridgeEventsAction{ + &processBlockAction{ p: p, description: "block1 (after it's reorged)", - blockNum: block1.Num, - events: block1.Events, + block: block1, expectedErr: nil, }, // processed: block3 - &storeBridgeEventsAction{ + &processBlockAction{ p: p, description: "block3", - blockNum: block3.Num, - events: block3.Events, + block: block3, expectedErr: nil, }, // processed: block1, block3 @@ -127,13 +125,16 @@ func TestProceessor(t *testing.T) { expectedErr: nil, }, &getClaimsAndBridgesAction{ - p: p, - description: "after block3: range 1, 3", - ctx: context.Background(), - fromBlock: 1, - toBlock: 3, - expectedEvents: append(block1.Events, block3.Events...), - expectedErr: nil, + p: p, + description: "after block3: range 1, 3", + ctx: context.Background(), + fromBlock: 1, + toBlock: 3, + expectedEvents: append( + eventsToBridgeEvents(block1.Events), + eventsToBridgeEvents(block3.Events)..., + ), + expectedErr: nil, }, &reorgAction{ p: p, @@ -162,27 +163,24 @@ func TestProceessor(t *testing.T) { expectedLastProcessedBlock: 1, expectedErr: nil, }, - &storeBridgeEventsAction{ + &processBlockAction{ p: p, description: "block3 after reorg", - blockNum: block3.Num, - events: block3.Events, + block: block3, expectedErr: nil, }, // processed: block1, block3 - &storeBridgeEventsAction{ + &processBlockAction{ p: p, description: "block4", - blockNum: block4.Num, - events: block4.Events, + block: block4, expectedErr: nil, }, // processed: block1, block3, block4 - &storeBridgeEventsAction{ + &processBlockAction{ p: p, description: "block5", - blockNum: block5.Num, - events: block5.Events, + block: block5, expectedErr: nil, }, // processed: block1, block3, block4, block5 @@ -194,22 +192,28 @@ func TestProceessor(t *testing.T) { expectedErr: nil, }, &getClaimsAndBridgesAction{ - p: p, - description: "after block5: range 1, 3", - ctx: context.Background(), - fromBlock: 1, - toBlock: 3, - expectedEvents: append(block1.Events, block3.Events...), - expectedErr: nil, + p: p, + description: "after block5: range 1, 3", + ctx: context.Background(), + fromBlock: 1, + toBlock: 3, + expectedEvents: append( + eventsToBridgeEvents(block1.Events), + eventsToBridgeEvents(block3.Events)..., + ), + expectedErr: nil, }, &getClaimsAndBridgesAction{ - p: p, - description: "after block5: range 4, 5", - ctx: context.Background(), - fromBlock: 4, - toBlock: 5, - expectedEvents: append(block4.Events, block5.Events...), - expectedErr: nil, + p: p, + description: "after block5: range 4, 5", + ctx: context.Background(), + fromBlock: 4, + toBlock: 5, + expectedEvents: append( + eventsToBridgeEvents(block4.Events), + eventsToBridgeEvents(block5.Events)..., + ), + expectedErr: nil, }, &getClaimsAndBridgesAction{ p: p, @@ -218,10 +222,10 @@ func TestProceessor(t *testing.T) { fromBlock: 0, toBlock: 5, expectedEvents: slices.Concat( - block1.Events, - block3.Events, - block4.Events, - block5.Events, + eventsToBridgeEvents(block1.Events), + eventsToBridgeEvents(block3.Events), + eventsToBridgeEvents(block4.Events), + eventsToBridgeEvents(block5.Events), ), expectedErr: nil, }, @@ -237,13 +241,13 @@ func TestProceessor(t *testing.T) { // blocks var ( - block1 = block{ - blockHeader: blockHeader{ + block1 = sync.EVMBlock{ + EVMBlockHeader: sync.EVMBlockHeader{ Num: 1, Hash: common.HexToHash("01"), }, - Events: []BridgeEvent{ - {Bridge: &Bridge{ + Events: []interface{}{ + BridgeEvent{Bridge: &Bridge{ LeafType: 1, OriginNetwork: 1, OriginAddress: common.HexToAddress("01"), @@ -253,7 +257,7 @@ var ( Metadata: common.Hex2Bytes("01"), DepositCount: 1, }}, - {Claim: &Claim{ + BridgeEvent{Claim: &Claim{ GlobalIndex: big.NewInt(1), OriginNetwork: 1, OriginAddress: common.HexToAddress("01"), @@ -262,13 +266,13 @@ var ( }}, }, } - block3 = block{ - blockHeader: blockHeader{ + block3 = sync.EVMBlock{ + EVMBlockHeader: sync.EVMBlockHeader{ Num: 3, Hash: common.HexToHash("02"), }, - Events: []BridgeEvent{ - {Bridge: &Bridge{ + Events: []interface{}{ + BridgeEvent{Bridge: &Bridge{ LeafType: 2, OriginNetwork: 2, OriginAddress: common.HexToAddress("02"), @@ -278,7 +282,7 @@ var ( Metadata: common.Hex2Bytes("02"), DepositCount: 2, }}, - {Bridge: &Bridge{ + BridgeEvent{Bridge: &Bridge{ LeafType: 3, OriginNetwork: 3, OriginAddress: common.HexToAddress("03"), @@ -290,27 +294,27 @@ var ( }}, }, } - block4 = block{ - blockHeader: blockHeader{ + block4 = sync.EVMBlock{ + EVMBlockHeader: sync.EVMBlockHeader{ Num: 4, Hash: common.HexToHash("03"), }, - Events: []BridgeEvent{}, + Events: []interface{}{}, } - block5 = block{ - blockHeader: blockHeader{ + block5 = sync.EVMBlock{ + EVMBlockHeader: sync.EVMBlockHeader{ Num: 5, Hash: common.HexToHash("04"), }, - Events: []BridgeEvent{ - {Claim: &Claim{ + Events: []interface{}{ + BridgeEvent{Claim: &Claim{ GlobalIndex: big.NewInt(4), OriginNetwork: 4, OriginAddress: common.HexToAddress("04"), DestinationAddress: common.HexToAddress("04"), Amount: big.NewInt(4), }}, - {Claim: &Claim{ + BridgeEvent{Claim: &Claim{ GlobalIndex: big.NewInt(5), OriginNetwork: 5, OriginAddress: common.HexToAddress("05"), @@ -374,7 +378,7 @@ func (a *getLastProcessedBlockAction) desc() string { } func (a *getLastProcessedBlockAction) execute(t *testing.T) { - actualLastProcessedBlock, actualErr := a.p.getLastProcessedBlock(a.ctx) + actualLastProcessedBlock, actualErr := a.p.GetLastProcessedBlock(a.ctx) require.Equal(t, a.expectedLastProcessedBlock, actualLastProcessedBlock) require.Equal(t, a.expectedErr, actualErr) } @@ -397,29 +401,36 @@ func (a *reorgAction) desc() string { } func (a *reorgAction) execute(t *testing.T) { - actualErr := a.p.reorg(a.firstReorgedBlock) + actualErr := a.p.Reorg(a.firstReorgedBlock) require.Equal(t, a.expectedErr, actualErr) } // storeBridgeEvents -type storeBridgeEventsAction struct { +type processBlockAction struct { p *processor description string - blockNum uint64 - events []BridgeEvent + block sync.EVMBlock expectedErr error } -func (a *storeBridgeEventsAction) method() string { +func (a *processBlockAction) method() string { return "storeBridgeEvents" } -func (a *storeBridgeEventsAction) desc() string { +func (a *processBlockAction) desc() string { return a.description } -func (a *storeBridgeEventsAction) execute(t *testing.T) { - actualErr := a.p.storeBridgeEvents(a.blockNum, a.events) +func (a *processBlockAction) execute(t *testing.T) { + actualErr := a.p.ProcessBlock(a.block) require.Equal(t, a.expectedErr, actualErr) } + +func eventsToBridgeEvents(events []interface{}) []BridgeEvent { + bridgeEvents := []BridgeEvent{} + for _, event := range events { + bridgeEvents = append(bridgeEvents, event.(BridgeEvent)) + } + return bridgeEvents +} diff --git a/localbridgesync/types.go b/localbridgesync/types.go deleted file mode 100644 index 3a6a508e..00000000 --- a/localbridgesync/types.go +++ /dev/null @@ -1,42 +0,0 @@ -package localbridgesync - -import ( - "math/big" - - "github.com/ethereum/go-ethereum/common" -) - -type Bridge struct { - LeafType uint8 - OriginNetwork uint32 - OriginAddress common.Address - DestinationNetwork uint32 - DestinationAddress common.Address - Amount *big.Int - Metadata []byte - DepositCount uint32 -} - -type Claim struct { - // TODO: pre uLxLy there was Index instead of GlobalIndex, should we treat this differently? - GlobalIndex *big.Int - OriginNetwork uint32 - OriginAddress common.Address - DestinationAddress common.Address - Amount *big.Int -} - -type BridgeEvent struct { - Bridge *Bridge - Claim *Claim -} - -type block struct { - blockHeader - Events []BridgeEvent -} - -type blockHeader struct { - Num uint64 - Hash common.Hash -} diff --git a/sync/common.go b/sync/common.go new file mode 100644 index 00000000..6d1011f5 --- /dev/null +++ b/sync/common.go @@ -0,0 +1,21 @@ +package sync + +import ( + "log" + "time" +) + +var ( + RetryAfterErrorPeriod = time.Second * 10 + MaxRetryAttemptsAfterError = 5 +) + +func RetryHandler(funcName string, attempts int) { + if attempts >= MaxRetryAttemptsAfterError { + log.Fatalf( + "%s failed too many times (%d)", + funcName, MaxRetryAttemptsAfterError, + ) + } + time.Sleep(RetryAfterErrorPeriod) +} diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go new file mode 100644 index 00000000..e6d9dc8d --- /dev/null +++ b/sync/evmdownloader.go @@ -0,0 +1,198 @@ +package sync + +import ( + "context" + "math/big" + "time" + + "github.com/0xPolygon/cdk/etherman" + "github.com/0xPolygon/cdk/log" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +type EthClienter interface { + ethereum.LogFilterer + ethereum.BlockNumberReader + ethereum.ChainReader + bind.ContractBackend +} + +type evmDownloaderInterface interface { + waitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64) + getEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []EVMBlock + getLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log + getBlockHeader(ctx context.Context, blockNum uint64) EVMBlockHeader +} + +type LogAppenderMap map[common.Hash]func(b *EVMBlock, l types.Log) error + +type EVMDownloader struct { + syncBlockChunkSize uint64 + evmDownloaderInterface +} + +func NewEVMDownloader( + ethClient EthClienter, + syncBlockChunkSize uint64, + blockFinalityType etherman.BlockNumberFinality, + waitForNewBlocksPeriod time.Duration, + appender LogAppenderMap, + adressessToQuery []common.Address, +) (*EVMDownloader, error) { + finality, err := blockFinalityType.ToBlockNum() + if err != nil { + return nil, err + } + topicsToQuery := [][]common.Hash{} + for topic := range appender { + topicsToQuery = append(topicsToQuery, []common.Hash{topic}) + } + return &EVMDownloader{ + syncBlockChunkSize: syncBlockChunkSize, + evmDownloaderInterface: &downloaderImplementation{ + ethClient: ethClient, + blockFinality: finality, + waitForNewBlocksPeriod: waitForNewBlocksPeriod, + appender: appender, + topicsToQuery: topicsToQuery, + adressessToQuery: adressessToQuery, + }, + }, nil +} + +func (d *EVMDownloader) download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) { + lastBlock := d.waitForNewBlocks(ctx, 0) + for { + select { + case <-ctx.Done(): + log.Debug("closing channel") + close(downloadedCh) + return + default: + } + toBlock := fromBlock + d.syncBlockChunkSize + if toBlock > lastBlock { + toBlock = lastBlock + } + if fromBlock > toBlock { + log.Debug("waiting for new blocks, last block ", toBlock) + lastBlock = d.waitForNewBlocks(ctx, toBlock) + continue + } + log.Debugf("getting events from blocks %d to %d", fromBlock, toBlock) + blocks := d.getEventsByBlockRange(ctx, fromBlock, toBlock) + for _, b := range blocks { + log.Debugf("sending block %d to the driver (with events)", b.Num) + downloadedCh <- b + } + if len(blocks) == 0 || blocks[len(blocks)-1].Num < toBlock { + // Indicate the last downloaded block if there are not events on it + log.Debugf("sending block %d to the driver (without evvents)", toBlock) + downloadedCh <- EVMBlock{ + EVMBlockHeader: d.getBlockHeader(ctx, toBlock), + } + } + fromBlock = toBlock + 1 + } +} + +type downloaderImplementation struct { + ethClient EthClienter + blockFinality *big.Int + waitForNewBlocksPeriod time.Duration + appender LogAppenderMap + topicsToQuery [][]common.Hash + adressessToQuery []common.Address +} + +func (d *downloaderImplementation) waitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64) { + attempts := 0 + for { + header, err := d.ethClient.HeaderByNumber(ctx, d.blockFinality) + if err != nil { + attempts++ + log.Error("error geting last block num from eth client: ", err) + RetryHandler("waitForNewBlocks", attempts) + continue + } + if header.Number.Uint64() > lastBlockSeen { + return header.Number.Uint64() + } + time.Sleep(d.waitForNewBlocksPeriod) + } +} + +func (d *downloaderImplementation) getEventsByBlockRange(ctx context.Context, fromBlock, toBlock uint64) []EVMBlock { + blocks := []EVMBlock{} + logs := d.getLogs(ctx, fromBlock, toBlock) + for _, l := range logs { + if len(blocks) == 0 || blocks[len(blocks)-1].Num < l.BlockNumber { + b := d.getBlockHeader(ctx, l.BlockNumber) + blocks = append(blocks, EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ + Num: l.BlockNumber, + Hash: l.BlockHash, + Timestamp: b.Timestamp, + ParentHash: b.ParentHash, + }, + Events: []interface{}{}, + }) + } + + for { + attempts := 0 + err := d.appender[l.Topics[0]](&blocks[len(blocks)-1], l) + if err != nil { + attempts++ + log.Error("error trying to append log: ", err) + RetryHandler("getLogs", attempts) + continue + } + break + } + } + + return blocks +} + +func (d *downloaderImplementation) getLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log { + query := ethereum.FilterQuery{ + FromBlock: new(big.Int).SetUint64(fromBlock), + Addresses: d.adressessToQuery, + Topics: d.topicsToQuery, + ToBlock: new(big.Int).SetUint64(toBlock), + } + attempts := 0 + for { + logs, err := d.ethClient.FilterLogs(ctx, query) + if err != nil { + attempts++ + log.Error("error calling FilterLogs to eth client: ", err) + RetryHandler("getLogs", attempts) + continue + } + return logs + } +} + +func (d *downloaderImplementation) getBlockHeader(ctx context.Context, blockNum uint64) EVMBlockHeader { + attempts := 0 + for { + header, err := d.ethClient.HeaderByNumber(ctx, big.NewInt(int64(blockNum))) + if err != nil { + attempts++ + log.Errorf("error getting block header for block %d, err: %v", blockNum, err) + RetryHandler("getBlockHeader", attempts) + continue + } + return EVMBlockHeader{ + Num: header.Number.Uint64(), + Hash: header.Hash(), + ParentHash: header.ParentHash, + Timestamp: header.Time, + } + } +} diff --git a/l1infotreesync/downloader_test.go b/sync/evmdownloader_test.go similarity index 72% rename from l1infotreesync/downloader_test.go rename to sync/evmdownloader_test.go index 28daf69b..c733aabd 100644 --- a/l1infotreesync/downloader_test.go +++ b/sync/evmdownloader_test.go @@ -1,9 +1,10 @@ -package l1infotreesync +package sync import ( "context" "errors" "math/big" + "strconv" "testing" "time" @@ -12,31 +13,33 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) var ( - contractAddr = common.HexToAddress("1234567890") + contractAddr = common.HexToAddress("f00") + eventSignature = crypto.Keccak256Hash([]byte("foo")) ) const ( syncBlockChunck = uint64(10) ) +type testEvent common.Hash + func TestGetEventsByBlockRange(t *testing.T) { type testCase struct { description string inputLogs []types.Log fromBlock, toBlock uint64 - expectedBlocks []block + expectedBlocks []EVMBlock } testCases := []testCase{} - clientMock := NewL2Mock(t) ctx := context.Background() - d, err := newDownloader(contractAddr, clientMock, syncBlockChunck, etherman.LatestBlock) - require.NoError(t, err) + d, clientMock := NewTestDownloader(t) // case 0: single block, no events case0 := testCase{ @@ -44,25 +47,23 @@ func TestGetEventsByBlockRange(t *testing.T) { inputLogs: []types.Log{}, fromBlock: 1, toBlock: 3, - expectedBlocks: []block{}, + expectedBlocks: []EVMBlock{}, } testCases = append(testCases, case0) // case 1: single block, single event - logC1, updateC1 := generateUpdateL1InfoTree(t, 3) + logC1, updateC1 := generateEvent(3) logsC1 := []types.Log{ *logC1, } - blocksC1 := []block{ + blocksC1 := []EVMBlock{ { - blockHeader: blockHeader{ + EVMBlockHeader: EVMBlockHeader{ Num: logC1.BlockNumber, Hash: logC1.BlockHash, ParentHash: common.HexToHash("foo"), }, - Events: []L1InfoTreeUpdate{ - updateC1, - }, + Events: []interface{}{updateC1}, }, } case1 := testCase{ @@ -75,24 +76,24 @@ func TestGetEventsByBlockRange(t *testing.T) { testCases = append(testCases, case1) // case 2: single block, multiple events - logC2_1, updateC2_1 := generateUpdateL1InfoTree(t, 5) - logC2_2, updateC2_2 := generateUpdateL1InfoTree(t, 5) - logC2_3, updateC2_3 := generateUpdateL1InfoTree(t, 5) - logC2_4, updateC2_4 := generateUpdateL1InfoTree(t, 5) + logC2_1, updateC2_1 := generateEvent(5) + logC2_2, updateC2_2 := generateEvent(5) + logC2_3, updateC2_3 := generateEvent(5) + logC2_4, updateC2_4 := generateEvent(5) logsC2 := []types.Log{ *logC2_1, *logC2_2, *logC2_3, *logC2_4, } - blocksC2 := []block{ + blocksC2 := []EVMBlock{ { - blockHeader: blockHeader{ + EVMBlockHeader: EVMBlockHeader{ Num: logC2_1.BlockNumber, Hash: logC2_1.BlockHash, ParentHash: common.HexToHash("foo"), }, - Events: []L1InfoTreeUpdate{ + Events: []interface{}{ updateC2_1, updateC2_2, updateC2_3, @@ -110,35 +111,35 @@ func TestGetEventsByBlockRange(t *testing.T) { testCases = append(testCases, case2) // case 3: multiple blocks, some events - logC3_1, updateC3_1 := generateUpdateL1InfoTree(t, 7) - logC3_2, updateC3_2 := generateUpdateL1InfoTree(t, 7) - logC3_3, updateC3_3 := generateUpdateL1InfoTree(t, 8) - logC3_4, updateC3_4 := generateUpdateL1InfoTree(t, 8) + logC3_1, updateC3_1 := generateEvent(7) + logC3_2, updateC3_2 := generateEvent(7) + logC3_3, updateC3_3 := generateEvent(8) + logC3_4, updateC3_4 := generateEvent(8) logsC3 := []types.Log{ *logC3_1, *logC3_2, *logC3_3, *logC3_4, } - blocksC3 := []block{ + blocksC3 := []EVMBlock{ { - blockHeader: blockHeader{ + EVMBlockHeader: EVMBlockHeader{ Num: logC3_1.BlockNumber, Hash: logC3_1.BlockHash, ParentHash: common.HexToHash("foo"), }, - Events: []L1InfoTreeUpdate{ + Events: []interface{}{ updateC3_1, updateC3_2, }, }, { - blockHeader: blockHeader{ + EVMBlockHeader: EVMBlockHeader{ Num: logC3_3.BlockNumber, Hash: logC3_3.BlockHash, ParentHash: common.HexToHash("foo"), }, - Events: []L1InfoTreeUpdate{ + Events: []interface{}{ updateC3_3, updateC3_4, }, @@ -158,7 +159,7 @@ func TestGetEventsByBlockRange(t *testing.T) { FromBlock: new(big.Int).SetUint64(tc.fromBlock), Addresses: []common.Address{contractAddr}, Topics: [][]common.Hash{ - {updateL1InfoTreeSignature}, + {eventSignature}, }, ToBlock: new(big.Int).SetUint64(tc.toBlock), } @@ -179,26 +180,19 @@ func TestGetEventsByBlockRange(t *testing.T) { } } -func generateUpdateL1InfoTree(t *testing.T, blockNum uint32) (*types.Log, L1InfoTreeUpdate) { - b := L1InfoTreeUpdate{ - MainnetExitRoot: common.BigToHash(big.NewInt(int64(blockNum))), - RollupExitRoot: common.BigToHash(big.NewInt(int64(blockNum))), - } - var rollup, mainnet [32]byte - mainnet = b.MainnetExitRoot - rollup = b.RollupExitRoot +func generateEvent(blockNum uint32) (*types.Log, testEvent) { + h := common.HexToHash(strconv.Itoa(int(blockNum))) log := &types.Log{ Address: contractAddr, BlockNumber: uint64(blockNum), - BlockHash: common.BytesToHash(uint64ToBytes(uint64(blockNum))), + BlockHash: h, Topics: []common.Hash{ - updateL1InfoTreeSignature, - mainnet, - rollup, + eventSignature, + h, }, Data: nil, } - return log, b + return log, testEvent(h) } func TestDownload(t *testing.T) { @@ -206,31 +200,29 @@ func TestDownload(t *testing.T) { NOTE: due to the concurrent nature of this test (the function being tested runs through a goroutine) if the mock doesn't match, the goroutine will get stuck and the test will timeout */ - d := NewDownloaderMock(t) - downloadCh := make(chan block, 1) + d := NewEVMDownloaderMock(t) + downloadCh := make(chan EVMBlock, 1) ctx := context.Background() ctx1, cancel := context.WithCancel(ctx) - expectedBlocks := []block{} - clientMock := NewL2Mock(t) - dwnldr, err := newDownloader(contractAddr, clientMock, syncBlockChunck, etherman.LatestBlock) - require.NoError(t, err) - dwnldr.downloaderInterface = d + expectedBlocks := []EVMBlock{} + dwnldr, _ := NewTestDownloader(t) + dwnldr.evmDownloaderInterface = d d.On("waitForNewBlocks", mock.Anything, uint64(0)). Return(uint64(1)) // iteratiion 0: // last block is 1, download that block (no events and wait) - b1 := block{ - blockHeader: blockHeader{ + b1 := EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ Num: 1, Hash: common.HexToHash("01"), }, } expectedBlocks = append(expectedBlocks, b1) d.On("getEventsByBlockRange", mock.Anything, uint64(0), uint64(1)). - Return([]block{}) + Return([]EVMBlock{}) d.On("getBlockHeader", mock.Anything, uint64(1)). - Return(b1.blockHeader) + Return(b1.EVMBlockHeader) // iteration 1: wait for next block to be created d.On("waitForNewBlocks", mock.Anything, uint64(1)). @@ -238,15 +230,15 @@ func TestDownload(t *testing.T) { Return(uint64(2)).Once() // iteration 2: block 2 has events - b2 := block{ - blockHeader: blockHeader{ + b2 := EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ Num: 2, Hash: common.HexToHash("02"), }, } expectedBlocks = append(expectedBlocks, b2) d.On("getEventsByBlockRange", mock.Anything, uint64(2), uint64(2)). - Return([]block{b2}) + Return([]EVMBlock{b2}) // iteration 3: wait for next block to be created (jump to block 8) d.On("waitForNewBlocks", mock.Anything, uint64(2)). @@ -254,35 +246,31 @@ func TestDownload(t *testing.T) { Return(uint64(8)).Once() // iteration 4: blocks 6 and 7 have events - b6 := block{ - blockHeader: blockHeader{ + b6 := EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ Num: 6, Hash: common.HexToHash("06"), }, - Events: []L1InfoTreeUpdate{ - {RollupExitRoot: common.HexToHash("06")}, - }, + Events: []interface{}{"06"}, } - b7 := block{ - blockHeader: blockHeader{ + b7 := EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ Num: 7, Hash: common.HexToHash("07"), }, - Events: []L1InfoTreeUpdate{ - {MainnetExitRoot: common.HexToHash("07")}, - }, + Events: []interface{}{"07"}, } - b8 := block{ - blockHeader: blockHeader{ + b8 := EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ Num: 8, Hash: common.HexToHash("08"), }, } expectedBlocks = append(expectedBlocks, b6, b7, b8) d.On("getEventsByBlockRange", mock.Anything, uint64(3), uint64(8)). - Return([]block{b6, b7}) + Return([]EVMBlock{b6, b7}) d.On("getBlockHeader", mock.Anything, uint64(8)). - Return(b8.blockHeader) + Return(b8.EVMBlockHeader) // iteration 5: wait for next block to be created (jump to block 30) d.On("waitForNewBlocks", mock.Anything, uint64(8)). @@ -290,31 +278,29 @@ func TestDownload(t *testing.T) { Return(uint64(30)).Once() // iteration 6: from block 9 to 19, no events - b19 := block{ - blockHeader: blockHeader{ + b19 := EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ Num: 19, Hash: common.HexToHash("19"), }, } expectedBlocks = append(expectedBlocks, b19) d.On("getEventsByBlockRange", mock.Anything, uint64(9), uint64(19)). - Return([]block{}) + Return([]EVMBlock{}) d.On("getBlockHeader", mock.Anything, uint64(19)). - Return(b19.blockHeader) + Return(b19.EVMBlockHeader) // iteration 7: from block 20 to 30, events on last block - b30 := block{ - blockHeader: blockHeader{ + b30 := EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ Num: 30, Hash: common.HexToHash("30"), }, - Events: []L1InfoTreeUpdate{ - {RollupExitRoot: common.HexToHash("30")}, - }, + Events: []interface{}{testEvent(common.HexToHash("30"))}, } expectedBlocks = append(expectedBlocks, b30) d.On("getEventsByBlockRange", mock.Anything, uint64(20), uint64(30)). - Return([]block{b30}) + Return([]EVMBlock{b30}) // iteration 8: wait for next block to be created (jump to block 35) d.On("waitForNewBlocks", mock.Anything, uint64(30)). @@ -334,11 +320,9 @@ func TestDownload(t *testing.T) { } func TestWaitForNewBlocks(t *testing.T) { - retryAfterErrorPeriod = time.Millisecond * 100 - clientMock := NewL2Mock(t) + RetryAfterErrorPeriod = time.Millisecond * 100 ctx := context.Background() - d, err := newDownloader(contractAddr, clientMock, syncBlockChunck, etherman.LatestBlock) - require.NoError(t, err) + d, clientMock := NewTestDownloader(t) // at first attempt currentBlock := uint64(5) @@ -369,18 +353,16 @@ func TestWaitForNewBlocks(t *testing.T) { } func TestGetBlockHeader(t *testing.T) { - retryAfterErrorPeriod = time.Millisecond * 100 - clientMock := NewL2Mock(t) + RetryAfterErrorPeriod = time.Millisecond * 100 ctx := context.Background() - d, err := newDownloader(contractAddr, clientMock, syncBlockChunck, etherman.LatestBlock) - require.NoError(t, err) + d, clientMock := NewTestDownloader(t) blockNum := uint64(5) blockNumBig := big.NewInt(5) returnedBlock := &types.Header{ Number: blockNumBig, } - expectedBlock := blockHeader{ + expectedBlock := EVMBlockHeader{ Num: 5, Hash: returnedBlock.Hash(), } @@ -396,3 +378,19 @@ func TestGetBlockHeader(t *testing.T) { actualBlock = d.getBlockHeader(ctx, blockNum) assert.Equal(t, expectedBlock, actualBlock) } + +func buildAppender() LogAppenderMap { + appender := make(LogAppenderMap) + appender[eventSignature] = func(b *EVMBlock, l types.Log) error { + b.Events = append(b.Events, testEvent(l.Topics[1])) + return nil + } + return appender +} + +func NewTestDownloader(t *testing.T) (*EVMDownloader, *L2Mock) { + clientMock := NewL2Mock(t) + d, err := NewEVMDownloader(clientMock, syncBlockChunck, etherman.LatestBlock, time.Millisecond, buildAppender(), []common.Address{contractAddr}) + require.NoError(t, err) + return d, clientMock +} diff --git a/localbridgesync/driver.go b/sync/evmdriver.go similarity index 54% rename from localbridgesync/driver.go rename to sync/evmdriver.go index eaeed1c7..0363e00c 100644 --- a/localbridgesync/driver.go +++ b/sync/evmdriver.go @@ -1,4 +1,4 @@ -package localbridgesync +package sync import ( "context" @@ -8,27 +8,24 @@ import ( "github.com/ethereum/go-ethereum/common" ) -const ( - downloadBufferSize = 1000 - reorgDetectorID = "localbridgesync" -) - -type downloaderFull interface { - downloaderInterface - download(ctx context.Context, fromBlock uint64, downloadedCh chan block) +type evmDownloaderFull interface { + evmDownloaderInterface + download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) } -type driver struct { - reorgDetector ReorgDetector - reorgSub *reorgdetector.Subscription - processor processorInterface - downloader downloaderFull +type EVMDriver struct { + reorgDetector ReorgDetector + reorgSub *reorgdetector.Subscription + processor processorInterface + downloader evmDownloaderFull + reorgDetectorID string + downloadBufferSize int } type processorInterface interface { - getLastProcessedBlock(ctx context.Context) (uint64, error) - storeBridgeEvents(blockNum uint64, events []BridgeEvent) error - reorg(firstReorgedBlock uint64) error + GetLastProcessedBlock(ctx context.Context) (uint64, error) + ProcessBlock(block EVMBlock) error + Reorg(firstReorgedBlock uint64) error } type ReorgDetector interface { @@ -36,24 +33,28 @@ type ReorgDetector interface { AddBlockToTrack(ctx context.Context, id string, blockNum uint64, blockHash common.Hash) error } -func newDriver( +func NewEVMDriver( reorgDetector ReorgDetector, processor processorInterface, - downloader downloaderFull, -) (*driver, error) { + downloader evmDownloaderFull, + reorgDetectorID string, + downloadBufferSize int, +) (*EVMDriver, error) { reorgSub, err := reorgDetector.Subscribe(reorgDetectorID) if err != nil { return nil, err } - return &driver{ - reorgDetector: reorgDetector, - reorgSub: reorgSub, - processor: processor, - downloader: downloader, + return &EVMDriver{ + reorgDetector: reorgDetector, + reorgSub: reorgSub, + processor: processor, + downloader: downloader, + reorgDetectorID: reorgDetectorID, + downloadBufferSize: downloadBufferSize, }, nil } -func (d *driver) Sync(ctx context.Context) { +func (d *EVMDriver) Sync(ctx context.Context) { reset: var ( lastProcessedBlock uint64 @@ -61,11 +62,11 @@ reset: err error ) for { - lastProcessedBlock, err = d.processor.getLastProcessedBlock(ctx) + lastProcessedBlock, err = d.processor.GetLastProcessedBlock(ctx) if err != nil { attempts++ log.Error("error geting last processed block: ", err) - retryHandler("Sync", attempts) + RetryHandler("Sync", attempts) continue } break @@ -74,7 +75,7 @@ reset: defer cancel() // start downloading - downloadCh := make(chan block, downloadBufferSize) + downloadCh := make(chan EVMBlock, d.downloadBufferSize) go d.downloader.download(cancellableCtx, lastProcessedBlock, downloadCh) for { @@ -90,33 +91,33 @@ reset: } } -func (d *driver) handleNewBlock(ctx context.Context, b block) { +func (d *EVMDriver) handleNewBlock(ctx context.Context, b EVMBlock) { attempts := 0 for { - err := d.reorgDetector.AddBlockToTrack(ctx, reorgDetectorID, b.Num, b.Hash) + err := d.reorgDetector.AddBlockToTrack(ctx, d.reorgDetectorID, b.Num, b.Hash) if err != nil { attempts++ log.Errorf("error adding block %d to tracker: %v", b.Num, err) - retryHandler("handleNewBlock", attempts) + RetryHandler("handleNewBlock", attempts) continue } break } attempts = 0 for { - err := d.processor.storeBridgeEvents(b.Num, b.Events) + err := d.processor.ProcessBlock(b) if err != nil { attempts++ log.Errorf("error processing events for blcok %d, err: ", b.Num, err) - retryHandler("handleNewBlock", attempts) + RetryHandler("handleNewBlock", attempts) continue } break } } -func (d *driver) handleReorg( - cancel context.CancelFunc, downloadCh chan block, firstReorgedBlock uint64, +func (d *EVMDriver) handleReorg( + cancel context.CancelFunc, downloadCh chan EVMBlock, firstReorgedBlock uint64, ) { // stop downloader cancel() @@ -127,14 +128,14 @@ func (d *driver) handleReorg( // handle reorg attempts := 0 for { - err := d.processor.reorg(firstReorgedBlock) + err := d.processor.Reorg(firstReorgedBlock) if err != nil { attempts++ log.Errorf( - "error processing reorg, last valid block %d, err: %v", + "error processing reorg, last valid Block %d, err: %v", firstReorgedBlock, err, ) - retryHandler("handleReorg", attempts) + RetryHandler("handleReorg", attempts) continue } break diff --git a/localbridgesync/driver_test.go b/sync/evmdriver_test.go similarity index 77% rename from localbridgesync/driver_test.go rename to sync/evmdriver_test.go index 543542f7..1f063cdb 100644 --- a/localbridgesync/driver_test.go +++ b/sync/evmdriver_test.go @@ -1,4 +1,4 @@ -package localbridgesync +package sync import ( "context" @@ -14,28 +14,32 @@ import ( "github.com/stretchr/testify/require" ) +var ( + reorgDetectorID = "foo" +) + func TestSync(t *testing.T) { - retryAfterErrorPeriod = time.Millisecond * 100 + RetryAfterErrorPeriod = time.Millisecond * 100 rdm := NewReorgDetectorMock(t) pm := NewProcessorMock(t) - dm := NewDownloaderMock(t) + dm := NewEVMDownloaderMock(t) firstReorgedBlock := make(chan uint64) reorgProcessed := make(chan bool) rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{ FirstReorgedBlock: firstReorgedBlock, ReorgProcessed: reorgProcessed, - }) - driver, err := newDriver(rdm, pm, dm) + }, nil) + driver, err := NewEVMDriver(rdm, pm, dm, reorgDetectorID, 10) require.NoError(t, err) ctx := context.Background() - expectedBlock1 := block{ - blockHeader: blockHeader{ + expectedBlock1 := EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ Num: 3, Hash: common.HexToHash("03"), }, } - expectedBlock2 := block{ - blockHeader: blockHeader{ + expectedBlock2 := EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ Num: 9, Hash: common.HexToHash("09"), }, @@ -47,7 +51,7 @@ func TestSync(t *testing.T) { reorg1Completed := reorgSemaphore{} dm.On("download", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { ctx := args.Get(0).(context.Context) - downloadedCh := args.Get(2).(chan block) + downloadedCh := args.Get(2).(chan EVMBlock) log.Info("entering mock loop") for { select { @@ -70,22 +74,22 @@ func TestSync(t *testing.T) { }) // Mocking this actions, the driver should "store" all the blocks from the downloader - pm.On("getLastProcessedBlock", ctx). + pm.On("GetLastProcessedBlock", ctx). Return(uint64(3), nil) rdm.On("AddBlockToTrack", ctx, reorgDetectorID, expectedBlock1.Num, expectedBlock1.Hash). Return(nil) - pm.On("storeBridgeEvents", expectedBlock1.Num, expectedBlock1.Events). + pm.On("ProcessBlock", expectedBlock1). Return(nil) rdm.On("AddBlockToTrack", ctx, reorgDetectorID, expectedBlock2.Num, expectedBlock2.Hash). Return(nil) - pm.On("storeBridgeEvents", expectedBlock2.Num, expectedBlock2.Events). + pm.On("ProcessBlock", expectedBlock2). Return(nil) go driver.Sync(ctx) time.Sleep(time.Millisecond * 200) // time to download expectedBlock1 // Trigger reorg 1 reorgedBlock1 := uint64(5) - pm.On("reorg", reorgedBlock1).Return(nil) + pm.On("Reorg", reorgedBlock1).Return(nil) firstReorgedBlock <- reorgedBlock1 ok := <-reorgProcessed require.True(t, ok) @@ -96,25 +100,25 @@ func TestSync(t *testing.T) { // Trigger reorg 2: syncer restarts the porcess reorgedBlock2 := uint64(7) - pm.On("reorg", reorgedBlock2).Return(nil) + pm.On("Reorg", reorgedBlock2).Return(nil) firstReorgedBlock <- reorgedBlock2 ok = <-reorgProcessed require.True(t, ok) } func TestHandleNewBlock(t *testing.T) { - retryAfterErrorPeriod = time.Millisecond * 100 + RetryAfterErrorPeriod = time.Millisecond * 100 rdm := NewReorgDetectorMock(t) pm := NewProcessorMock(t) - dm := NewDownloaderMock(t) - rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{}) - driver, err := newDriver(rdm, pm, dm) + dm := NewEVMDownloaderMock(t) + rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{}, nil) + driver, err := NewEVMDriver(rdm, pm, dm, reorgDetectorID, 10) require.NoError(t, err) ctx := context.Background() // happy path - b1 := block{ - blockHeader: blockHeader{ + b1 := EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ Num: 1, Hash: common.HexToHash("f00"), }, @@ -122,13 +126,13 @@ func TestHandleNewBlock(t *testing.T) { rdm. On("AddBlockToTrack", ctx, reorgDetectorID, b1.Num, b1.Hash). Return(nil) - pm.On("storeBridgeEvents", b1.Num, b1.Events). + pm.On("ProcessBlock", b1). Return(nil) driver.handleNewBlock(ctx, b1) // reorg deteector fails once - b2 := block{ - blockHeader: blockHeader{ + b2 := EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ Num: 2, Hash: common.HexToHash("f00"), }, @@ -139,13 +143,13 @@ func TestHandleNewBlock(t *testing.T) { rdm. On("AddBlockToTrack", ctx, reorgDetectorID, b2.Num, b2.Hash). Return(nil).Once() - pm.On("storeBridgeEvents", b2.Num, b2.Events). + pm.On("ProcessBlock", b2). Return(nil) driver.handleNewBlock(ctx, b2) // processor fails once - b3 := block{ - blockHeader: blockHeader{ + b3 := EVMBlock{ + EVMBlockHeader: EVMBlockHeader{ Num: 3, Hash: common.HexToHash("f00"), }, @@ -153,30 +157,30 @@ func TestHandleNewBlock(t *testing.T) { rdm. On("AddBlockToTrack", ctx, reorgDetectorID, b3.Num, b3.Hash). Return(nil) - pm.On("storeBridgeEvents", b3.Num, b3.Events). + pm.On("ProcessBlock", b3). Return(errors.New("foo")).Once() - pm.On("storeBridgeEvents", b3.Num, b3.Events). + pm.On("ProcessBlock", b3). Return(nil).Once() driver.handleNewBlock(ctx, b3) } func TestHandleReorg(t *testing.T) { - retryAfterErrorPeriod = time.Millisecond * 100 + RetryAfterErrorPeriod = time.Millisecond * 100 rdm := NewReorgDetectorMock(t) pm := NewProcessorMock(t) - dm := NewDownloaderMock(t) + dm := NewEVMDownloaderMock(t) reorgProcessed := make(chan bool) rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{ ReorgProcessed: reorgProcessed, - }) - driver, err := newDriver(rdm, pm, dm) + }, nil) + driver, err := NewEVMDriver(rdm, pm, dm, reorgDetectorID, 10) require.NoError(t, err) ctx := context.Background() // happy path _, cancel := context.WithCancel(ctx) - downloadCh := make(chan block) + downloadCh := make(chan EVMBlock) firstReorgedBlock := uint64(5) pm.On("reorg", firstReorgedBlock).Return(nil) go driver.handleReorg(cancel, downloadCh, firstReorgedBlock) @@ -186,20 +190,20 @@ func TestHandleReorg(t *testing.T) { // download ch sends some garbage _, cancel = context.WithCancel(ctx) - downloadCh = make(chan block) + downloadCh = make(chan EVMBlock) firstReorgedBlock = uint64(6) pm.On("reorg", firstReorgedBlock).Return(nil) go driver.handleReorg(cancel, downloadCh, firstReorgedBlock) - downloadCh <- block{} - downloadCh <- block{} - downloadCh <- block{} + downloadCh <- EVMBlock{} + downloadCh <- EVMBlock{} + downloadCh <- EVMBlock{} close(downloadCh) done = <-reorgProcessed require.True(t, done) // processor fails 2 times _, cancel = context.WithCancel(ctx) - downloadCh = make(chan block) + downloadCh = make(chan EVMBlock) firstReorgedBlock = uint64(7) pm.On("reorg", firstReorgedBlock).Return(errors.New("foo")).Once() pm.On("reorg", firstReorgedBlock).Return(errors.New("foo")).Once() diff --git a/sync/evmtypes.go b/sync/evmtypes.go new file mode 100644 index 00000000..d242dbc4 --- /dev/null +++ b/sync/evmtypes.go @@ -0,0 +1,15 @@ +package sync + +import "github.com/ethereum/go-ethereum/common" + +type EVMBlock struct { + EVMBlockHeader + Events []interface{} +} + +type EVMBlockHeader struct { + Num uint64 + Hash common.Hash + ParentHash common.Hash + Timestamp uint64 +} diff --git a/l1infotreesync/mock_downloader_test.go b/sync/mock_downloader_test.go similarity index 54% rename from l1infotreesync/mock_downloader_test.go rename to sync/mock_downloader_test.go index fca72678..738fc873 100644 --- a/l1infotreesync/mock_downloader_test.go +++ b/sync/mock_downloader_test.go @@ -1,6 +1,6 @@ // Code generated by mockery v2.22.1. DO NOT EDIT. -package l1infotreesync +package sync import ( context "context" @@ -9,45 +9,40 @@ import ( mock "github.com/stretchr/testify/mock" ) -// DownloaderMock is an autogenerated mock type for the downloaderFull type -type DownloaderMock struct { +// EVMDownloaderMock is an autogenerated mock type for the evmDownloaderFull type +type EVMDownloaderMock struct { mock.Mock } -// appendLog provides a mock function with given fields: b, l -func (_m *DownloaderMock) appendLog(b *block, l types.Log) { - _m.Called(b, l) -} - // download provides a mock function with given fields: ctx, fromBlock, downloadedCh -func (_m *DownloaderMock) download(ctx context.Context, fromBlock uint64, downloadedCh chan block) { +func (_m *EVMDownloaderMock) download(ctx context.Context, fromBlock uint64, downloadedCh chan EVMBlock) { _m.Called(ctx, fromBlock, downloadedCh) } // getBlockHeader provides a mock function with given fields: ctx, blockNum -func (_m *DownloaderMock) getBlockHeader(ctx context.Context, blockNum uint64) blockHeader { +func (_m *EVMDownloaderMock) getBlockHeader(ctx context.Context, blockNum uint64) EVMBlockHeader { ret := _m.Called(ctx, blockNum) - var r0 blockHeader - if rf, ok := ret.Get(0).(func(context.Context, uint64) blockHeader); ok { + var r0 EVMBlockHeader + if rf, ok := ret.Get(0).(func(context.Context, uint64) EVMBlockHeader); ok { r0 = rf(ctx, blockNum) } else { - r0 = ret.Get(0).(blockHeader) + r0 = ret.Get(0).(EVMBlockHeader) } return r0 } // getEventsByBlockRange provides a mock function with given fields: ctx, fromBlock, toBlock -func (_m *DownloaderMock) getEventsByBlockRange(ctx context.Context, fromBlock uint64, toBlock uint64) []block { +func (_m *EVMDownloaderMock) getEventsByBlockRange(ctx context.Context, fromBlock uint64, toBlock uint64) []EVMBlock { ret := _m.Called(ctx, fromBlock, toBlock) - var r0 []block - if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64) []block); ok { + var r0 []EVMBlock + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64) []EVMBlock); ok { r0 = rf(ctx, fromBlock, toBlock) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]block) + r0 = ret.Get(0).([]EVMBlock) } } @@ -55,7 +50,7 @@ func (_m *DownloaderMock) getEventsByBlockRange(ctx context.Context, fromBlock u } // getLogs provides a mock function with given fields: ctx, fromBlock, toBlock -func (_m *DownloaderMock) getLogs(ctx context.Context, fromBlock uint64, toBlock uint64) []types.Log { +func (_m *EVMDownloaderMock) getLogs(ctx context.Context, fromBlock uint64, toBlock uint64) []types.Log { ret := _m.Called(ctx, fromBlock, toBlock) var r0 []types.Log @@ -71,7 +66,7 @@ func (_m *DownloaderMock) getLogs(ctx context.Context, fromBlock uint64, toBlock } // waitForNewBlocks provides a mock function with given fields: ctx, lastBlockSeen -func (_m *DownloaderMock) waitForNewBlocks(ctx context.Context, lastBlockSeen uint64) uint64 { +func (_m *EVMDownloaderMock) waitForNewBlocks(ctx context.Context, lastBlockSeen uint64) uint64 { ret := _m.Called(ctx, lastBlockSeen) var r0 uint64 @@ -84,14 +79,14 @@ func (_m *DownloaderMock) waitForNewBlocks(ctx context.Context, lastBlockSeen ui return r0 } -type mockConstructorTestingTNewDownloaderMock interface { +type mockConstructorTestingTNewEVMDownloaderMock interface { mock.TestingT Cleanup(func()) } -// NewDownloaderMock creates a new instance of DownloaderMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewDownloaderMock(t mockConstructorTestingTNewDownloaderMock) *DownloaderMock { - mock := &DownloaderMock{} +// NewEVMDownloaderMock creates a new instance of EVMDownloaderMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewEVMDownloaderMock(t mockConstructorTestingTNewEVMDownloaderMock) *EVMDownloaderMock { + mock := &EVMDownloaderMock{} mock.Mock.Test(t) t.Cleanup(func() { mock.AssertExpectations(t) }) diff --git a/l1infotreesync/mock_l2_test.go b/sync/mock_l2_test.go similarity index 99% rename from l1infotreesync/mock_l2_test.go rename to sync/mock_l2_test.go index 9ab6868d..0d1e03da 100644 --- a/l1infotreesync/mock_l2_test.go +++ b/sync/mock_l2_test.go @@ -1,6 +1,6 @@ // Code generated by mockery v2.22.1. DO NOT EDIT. -package l1infotreesync +package sync import ( context "context" diff --git a/l1infotreesync/mock_processor_test.go b/sync/mock_processor_test.go similarity index 73% rename from l1infotreesync/mock_processor_test.go rename to sync/mock_processor_test.go index f7105850..f0959b29 100644 --- a/l1infotreesync/mock_processor_test.go +++ b/sync/mock_processor_test.go @@ -1,6 +1,6 @@ // Code generated by mockery v2.22.1. DO NOT EDIT. -package l1infotreesync +package sync import ( context "context" @@ -13,8 +13,8 @@ type ProcessorMock struct { mock.Mock } -// getLastProcessedBlock provides a mock function with given fields: ctx -func (_m *ProcessorMock) getLastProcessedBlock(ctx context.Context) (uint64, error) { +// GetLastProcessedBlock provides a mock function with given fields: ctx +func (_m *ProcessorMock) GetLastProcessedBlock(ctx context.Context) (uint64, error) { ret := _m.Called(ctx) var r0 uint64 @@ -37,13 +37,13 @@ func (_m *ProcessorMock) getLastProcessedBlock(ctx context.Context) (uint64, err return r0, r1 } -// processBlock provides a mock function with given fields: block -func (_m *ProcessorMock) processBlock(b block) error { - ret := _m.Called(b) +// ProcessBlock provides a mock function with given fields: block +func (_m *ProcessorMock) ProcessBlock(block EVMBlock) error { + ret := _m.Called(block) var r0 error - if rf, ok := ret.Get(0).(func(block) error); ok { - r0 = rf(b) + if rf, ok := ret.Get(0).(func(EVMBlock) error); ok { + r0 = rf(block) } else { r0 = ret.Error(0) } @@ -51,8 +51,8 @@ func (_m *ProcessorMock) processBlock(b block) error { return r0 } -// reorg provides a mock function with given fields: firstReorgedBlock -func (_m *ProcessorMock) reorg(firstReorgedBlock uint64) error { +// Reorg provides a mock function with given fields: firstReorgedBlock +func (_m *ProcessorMock) Reorg(firstReorgedBlock uint64) error { ret := _m.Called(firstReorgedBlock) var r0 error diff --git a/localbridgesync/mock_reorgdetector_test.go b/sync/mock_reorgdetector_test.go similarity index 98% rename from localbridgesync/mock_reorgdetector_test.go rename to sync/mock_reorgdetector_test.go index 3639cb9a..056da2a1 100644 --- a/localbridgesync/mock_reorgdetector_test.go +++ b/sync/mock_reorgdetector_test.go @@ -1,6 +1,6 @@ // Code generated by mockery v2.22.1. DO NOT EDIT. -package localbridgesync +package sync import ( context "context" diff --git a/test/Makefile b/test/Makefile index 5504c31c..499c467a 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,15 +1,9 @@ .PHONY: generate-mocks generate-mocks: - $(MAKE) generate-mocks-localbridgesync $(MAKE) generate-mocks-reorgdetector $(MAKE) generate-mocks-l1infotreesync - -.PHONY: generate-mocks-localbridgesync -generate-mocks-localbridgesync: ## Generates mocks for localbridgesync, using mockery tool - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthClienter --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=L2Mock --filename=mock_l2_test.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=downloaderFull --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=DownloaderMock --filename=mock_downloader_test.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=processorInterface --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=ProcessorMock --filename=mock_processor_test.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ReorgDetector --dir=../localbridgesync --output=../localbridgesync --outpkg=localbridgesync --inpackage --structname=ReorgDetectorMock --filename=mock_reorgdetector_test.go + $(MAKE) generate-mocks-aggoracle + $(MAKE) generate-mocks-sync .PHONY: generate-mocks-reorgdetector generate-mocks-reorgdetector: ## Generates mocks for reorgdetector, using mockery tool @@ -17,11 +11,15 @@ generate-mocks-reorgdetector: ## Generates mocks for reorgdetector, using mocker .PHONY: generate-mocks-l1infotreesync generate-mocks-l1infotreesync: ## Generates mocks for l1infotreesync , using mockery tool - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthClienter --dir=../l1infotreesync --output=../l1infotreesync --outpkg=l1infotreesync --inpackage --structname=L2Mock --filename=mock_l2_test.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=downloaderFull --dir=../l1infotreesync --output=../l1infotreesync --outpkg=l1infotreesync --inpackage --structname=DownloaderMock --filename=mock_downloader_test.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=processorInterface --dir=../l1infotreesync --output=../l1infotreesync --outpkg=l1infotreesync --inpackage --structname=ProcessorMock --filename=mock_processor_test.go - export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ReorgDetector --dir=../l1infotreesync --output=../l1infotreesync --outpkg=l1infotreesync --inpackage --structname=ReorgDetectorMock --filename=mock_reorgdetector_test.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ReorgDetector --dir=../sync --output=../l1infotreesync --outpkg=l1infotreesync --structname=ReorgDetectorMock --filename=mock_reorgdetector_test.go .PHONY: generate-mocks-aggoracle generate-mocks-aggoracle: ## Generates mocks for aggoracle , using mockery tool export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthTxManager --dir=../aggoracle/chaingersender --output=../aggoracle --outpkg=aggoracle --structname=EthTxManagerMock --filename=mock_ethtxmanager_test.go + +.PHONY: generate-mocks-sync +generate-mocks-sync: ## Generates mocks for sync, using mockery tool + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=EthClienter --dir=../sync --output=../sync --outpkg=sync --inpackage --structname=L2Mock --filename=mock_l2_test.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=evmDownloaderFull --dir=../sync --output=../sync --outpkg=sync --inpackage --structname=EVMDownloaderMock --filename=mock_downloader_test.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=processorInterface --dir=../sync --output=../sync --outpkg=sync --inpackage --structname=ProcessorMock --filename=mock_processor_test.go + export "GOROOT=$$(go env GOROOT)" && $$(go env GOPATH)/bin/mockery --name=ReorgDetector --dir=../sync --output=../sync --outpkg=sync --inpackage --structname=ReorgDetectorMock --filename=mock_reorgdetector_test.go