diff --git a/aggoracle/chaingersender/evm.go b/aggoracle/chaingersender/evm.go index 93ce347c..859f4b8b 100644 --- a/aggoracle/chaingersender/evm.go +++ b/aggoracle/chaingersender/evm.go @@ -41,13 +41,13 @@ type EVMChainGERSender struct { } type EVMConfig struct { - GlobalExitRootL2 common.Address `mapstructure:"GlobalExitRootL2"` - URLRPCL2 string `mapstructure:"URLRPCL2"` - ChainIDL2 uint64 `mapstructure:"ChainIDL2"` - GasOffset uint64 `mapstructure:"GasOffset"` - WaitPeriodMonitorTx cfgTypes.Duration `mapstructure:"WaitPeriodMonitorTx"` - SenderAddr common.Address `mapstructure:"SenderAddr"` - EthTxManager ethtxmanager.Config `mapstructure:"EthTxManager"` + GlobalExitRootL2Addr common.Address `mapstructure:"GlobalExitRootL2"` + URLRPCL2 string `mapstructure:"URLRPCL2"` + ChainIDL2 uint64 `mapstructure:"ChainIDL2"` + GasOffset uint64 `mapstructure:"GasOffset"` + WaitPeriodMonitorTx cfgTypes.Duration `mapstructure:"WaitPeriodMonitorTx"` + SenderAddr common.Address `mapstructure:"SenderAddr"` + EthTxManager ethtxmanager.Config `mapstructure:"EthTxManager"` } func NewEVMChainGERSender( diff --git a/aggoracle/e2e_test.go b/aggoracle/e2e_test.go index 3e09c906..ce081c35 100644 --- a/aggoracle/e2e_test.go +++ b/aggoracle/e2e_test.go @@ -60,7 +60,7 @@ func commonSetup(t *testing.T) ( require.NoError(t, err) // Syncer dbPathSyncer := t.TempDir() - syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerL1Addr, 10, etherman.LatestBlock, reorg, l1Client.Client(), time.Millisecond, 0) + syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerL1Addr, 10, etherman.LatestBlock, reorg, l1Client.Client(), time.Millisecond, 0, 100*time.Millisecond, 3) require.NoError(t, err) go syncer.Start(ctx) diff --git a/aggoracle/oracle.go b/aggoracle/oracle.go index 49d14b7e..f22ee1f0 100644 --- a/aggoracle/oracle.go +++ b/aggoracle/oracle.go @@ -78,12 +78,12 @@ func (a *AggOracle) Start(ctx context.Context) { log.Debugf("GER %s already injected", gerToInject.Hex()) continue } - log.Debugf("injecting new GER: %s", gerToInject.Hex()) + log.Infof("injecting new GER: %s", gerToInject.Hex()) if err := a.chainSender.UpdateGERWaitUntilMined(ctx, gerToInject); err != nil { log.Errorf("error calling updateGERWaitUntilMined, when trying to inject GER %s: %v", gerToInject.Hex(), err) continue } - log.Debugf("GER %s injected", gerToInject.Hex()) + log.Infof("GER %s injected", gerToInject.Hex()) case <-ctx.Done(): return } diff --git a/cmd/run.go b/cmd/run.go index 4bb24fc4..f7d36b74 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -217,7 +217,7 @@ func createAggoracle(cfg config.Config, l1Client *ethclient.Client, syncer *l1in log.Fatal(err) } sender, err = chaingersender.NewEVMChainGERSender( - cfg.AggOracle.EVMSender.GlobalExitRootL2, + cfg.AggOracle.EVMSender.GlobalExitRootL2Addr, cfg.AggOracle.EVMSender.SenderAddr, l2CLient, ethTxManager, @@ -388,6 +388,8 @@ func newL1InfoTreeSyncer( l1Client, cfg.L1InfoTreeSync.WaitForNewBlocksPeriod.Duration, cfg.L1InfoTreeSync.InitialBlock, + cfg.L1InfoTreeSync.RetryAfterErrorPeriod.Duration, + cfg.L1InfoTreeSync.MaxRetryAttemptsAfterError, ) if err != nil { log.Fatal(err) diff --git a/config/default.go b/config/default.go index 7b12e6f3..85dbd1cd 100644 --- a/config/default.go +++ b/config/default.go @@ -118,4 +118,49 @@ SequencerPrivateKey = {} [Aggregator.Synchronizer.Etherman] [Aggregator.Synchronizer.Etherman.Validium] Enabled = false + +[ReorgDetectorL1] +DBPath = "/tmp/reorgdetector" + +[L1InfoTreeSync] +DBPath = "/tmp/L1InfoTreeSync" +GlobalExitRootAddr="0x8464135c8F25Da09e49BC8782676a84730C318bC" +SyncBlockChunkSize=10 +BlockFinality="latest" +URLRPCL1="http://test-aggoracle-l1:8545" +WaitForNewBlocksPeriod="100ms" +InitialBlock=0 + +[AggOracle] +TargetChainType="EVM" +URLRPCL1="http://test-aggoracle-l1:8545" +BlockFinality="latest" +WaitPeriodNextGER="100ms" + [EVMSender] + GlobalExitRootL2="0x8464135c8F25Da09e49BC8782676a84730C318bC" + URLRPCL2="http://test-aggoracle-l2:8545" + ChainIDL2=1337 + GasOffset=0 + WaitPeriodMonitorTx="100ms" + SenderAddr="0x70997970c51812dc3a010c7d01b50e0d17dc79c8" + [SequenceSender.EthTxManager] + FrequencyToMonitorTxs = "1s" + WaitTxToBeMined = "2s" + GetReceiptMaxTime = "250ms" + GetReceiptWaitInterval = "1s" + PrivateKeys = [ + {Path = "/app/keystore/aggoracle.keystore", Password = "testonly"}, + ] + ForcedGas = 0 + GasPriceMarginFactor = 1 + MaxGasPriceLimit = 0 + PersistenceFilename = "/tmp/ethtxmanager.json" + ReadPendingL1Txs = false + SafeStatusL1NumberOfBlocks = 5 + FinalizedStatusL1NumberOfBlocks = 10 + [SequenceSender.EthTxManager.Etherman] + URL = "http://test-aggoracle-l2" + MultiGasProvider = false + L1ChainID = 1337 + HTTPHeaders = [] ` diff --git a/l1infotreesync/e2e_test.go b/l1infotreesync/e2e_test.go index 35958f08..82bef733 100644 --- a/l1infotreesync/e2e_test.go +++ b/l1infotreesync/e2e_test.go @@ -54,7 +54,7 @@ func TestE2E(t *testing.T) { rdm.On("AddBlockToTrack", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) client, gerAddr, gerSc, err := newSimulatedClient(auth) require.NoError(t, err) - syncer, err := New(ctx, dbPath, gerAddr, 10, etherman.LatestBlock, rdm, client.Client(), time.Millisecond, 0) + syncer, err := New(ctx, dbPath, gerAddr, 10, etherman.LatestBlock, rdm, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 3) require.NoError(t, err) go syncer.Start(ctx) diff --git a/l1infotreesync/l1infotreesync.go b/l1infotreesync/l1infotreesync.go index ed9056bc..7b3be753 100644 --- a/l1infotreesync/l1infotreesync.go +++ b/l1infotreesync/l1infotreesync.go @@ -15,20 +15,17 @@ const ( downloadBufferSize = 1000 ) -var ( - retryAfterErrorPeriod = time.Second * 10 - maxRetryAttemptsAfterError = 5 -) - type Config struct { DBPath string `mapstructure:"DBPath"` GlobalExitRootAddr common.Address `mapstructure:"GlobalExitRootAddr"` SyncBlockChunkSize uint64 `mapstructure:"SyncBlockChunkSize"` // TODO: BlockFinality doesnt work as per the jsonschema - BlockFinality string `jsonschema:"enum=latest,enum=safe, enum=pending, enum=finalized" mapstructure:"BlockFinality"` - URLRPCL1 string `mapstructure:"URLRPCL1"` - WaitForNewBlocksPeriod types.Duration `mapstructure:"WaitForNewBlocksPeriod"` - InitialBlock uint64 `mapstructure:"InitialBlock"` + BlockFinality string `jsonschema:"enum=latest,enum=safe, enum=pending, enum=finalized" mapstructure:"BlockFinality"` + URLRPCL1 string `mapstructure:"URLRPCL1"` + WaitForNewBlocksPeriod types.Duration `mapstructure:"WaitForNewBlocksPeriod"` + InitialBlock uint64 `mapstructure:"InitialBlock"` + RetryAfterErrorPeriod types.Duration `mapstructure:"RetryAfterErrorPeriod"` + MaxRetryAttemptsAfterError int `mapstructure:"MaxRetryAttemptsAfterError"` } type L1InfoTreeSync struct { @@ -46,6 +43,8 @@ func New( l1Client EthClienter, waitForNewBlocksPeriod time.Duration, initialBlock uint64, + retryAfterErrorPeriod time.Duration, + maxRetryAttemptsAfterError int, ) (*L1InfoTreeSync, error) { processor, err := newProcessor(ctx, dbPath) if err != nil { @@ -56,14 +55,18 @@ func New( if err != nil { return nil, err } - if lastProcessedBlock < initialBlock { + if initialBlock > 0 && lastProcessedBlock < initialBlock-1 { err = processor.ProcessBlock(sync.Block{ - Num: initialBlock, + Num: initialBlock - 1, }) if err != nil { return nil, err } } + rh := &sync.RetryHandler{ + RetryAfterErrorPeriod: retryAfterErrorPeriod, + MaxRetryAttemptsAfterError: maxRetryAttemptsAfterError, + } appender, err := buildAppender(l1Client, globalExitRoot) if err != nil { @@ -76,12 +79,13 @@ func New( waitForNewBlocksPeriod, appender, []common.Address{globalExitRoot}, + rh, ) if err != nil { return nil, err } - driver, err := sync.NewEVMDriver(rd, processor, downloader, reorgDetectorID, downloadBufferSize) + driver, err := sync.NewEVMDriver(rd, processor, downloader, reorgDetectorID, downloadBufferSize, rh) if err != nil { return nil, err } diff --git a/l1infotreesync/processor.go b/l1infotreesync/processor.go index 9b444d43..35191062 100644 --- a/l1infotreesync/processor.go +++ b/l1infotreesync/processor.go @@ -122,7 +122,13 @@ func newProcessor(ctx context.Context, dbPath string) (*processor, error) { p := &processor{ db: db, } - leaves, err := p.getAllLeavesHashed(ctx) + + tx, err := p.db.BeginRo(ctx) + if err != nil { + return nil, err + } + defer tx.Rollback() + leaves, err := p.getAllLeavesHashed(tx) if err != nil { return nil, err } @@ -134,14 +140,8 @@ func newProcessor(ctx context.Context, dbPath string) (*processor, error) { return p, nil } -func (p *processor) getAllLeavesHashed(ctx context.Context) ([][32]byte, error) { +func (p *processor) getAllLeavesHashed(tx kv.Tx) ([][32]byte, error) { // TODO: same coment about refactor that appears at ComputeMerkleProofByIndex - tx, err := p.db.BeginRo(ctx) - if err != nil { - return nil, err - } - defer tx.Rollback() - index, err := p.getLastIndex(tx) if err == ErrNotFound || index == 0 { return nil, nil @@ -323,7 +323,6 @@ func (p *processor) getLastProcessedBlockWithTx(tx kv.Tx) (uint64, error) { } func (p *processor) Reorg(firstReorgedBlock uint64) error { - // TODO: Does tree need to be reorged? tx, err := p.db.BeginRw(context.Background()) if err != nil { return err @@ -359,6 +358,17 @@ func (p *processor) Reorg(firstReorgedBlock uint64) error { tx.Rollback() return err } + leaves, err := p.getAllLeavesHashed(tx) + if err != nil { + tx.Rollback() + return err + } + tree, err := l1infotree.NewL1InfoTree(treeHeight, leaves) + if err != nil { + tx.Rollback() + return err + } + p.tree = tree return tx.Commit() } @@ -404,6 +414,7 @@ func (p *processor) ProcessBlock(b sync.Block) error { if err != nil { return err } + events := make([]Event, len(b.Events)) if len(b.Events) > 0 { var initialIndex uint32 lastIndex, err := p.getLastIndex(tx) @@ -417,6 +428,7 @@ func (p *processor) ProcessBlock(b sync.Block) error { } for i, e := range b.Events { event := e.(Event) + events = append(events, event) leafToStore := storeLeaf{ Index: initialIndex + uint32(i), MainnetExitRoot: event.MainnetExitRoot, @@ -448,7 +460,7 @@ func (p *processor) ProcessBlock(b sync.Block) error { tx.Rollback() return err } - log.Debugf("block %d processed with events: %+v", b.Num, b.Events) + log.Debugf("block %d processed with events: %+v", b.Num, events) return tx.Commit() } diff --git a/localbridgesync/localbridgesync.go b/localbridgesync/localbridgesync.go index d0617584..94e60b59 100644 --- a/localbridgesync/localbridgesync.go +++ b/localbridgesync/localbridgesync.go @@ -50,6 +50,10 @@ func New( return nil, err } } + rh := &sync.RetryHandler{ + MaxRetryAttemptsAfterError: maxRetryAttemptsAfterError, + RetryAfterErrorPeriod: retryAfterErrorPeriod, + } appender, err := buildAppender(l2Client, bridge) if err != nil { @@ -62,12 +66,13 @@ func New( waitForNewBlocksPeriod, appender, []common.Address{bridge}, + rh, ) if err != nil { return nil, err } - driver, err := sync.NewEVMDriver(rd, processor, downloader, reorgDetectorID, downloadBufferSize) + driver, err := sync.NewEVMDriver(rd, processor, downloader, reorgDetectorID, downloadBufferSize, rh) if err != nil { return nil, err } diff --git a/sync/common.go b/sync/common.go index 6d1011f5..4fe049d5 100644 --- a/sync/common.go +++ b/sync/common.go @@ -5,17 +5,17 @@ import ( "time" ) -var ( - RetryAfterErrorPeriod = time.Second * 10 - MaxRetryAttemptsAfterError = 5 -) +type RetryHandler struct { + RetryAfterErrorPeriod time.Duration + MaxRetryAttemptsAfterError int +} -func RetryHandler(funcName string, attempts int) { - if attempts >= MaxRetryAttemptsAfterError { +func (h *RetryHandler) Handle(funcName string, attempts int) { + if h.MaxRetryAttemptsAfterError > -1 && attempts >= h.MaxRetryAttemptsAfterError { log.Fatalf( "%s failed too many times (%d)", - funcName, MaxRetryAttemptsAfterError, + funcName, h.MaxRetryAttemptsAfterError, ) } - time.Sleep(RetryAfterErrorPeriod) + time.Sleep(h.RetryAfterErrorPeriod) } diff --git a/sync/evmdownloader.go b/sync/evmdownloader.go index ebdde880..ad452856 100644 --- a/sync/evmdownloader.go +++ b/sync/evmdownloader.go @@ -41,6 +41,7 @@ func NewEVMDownloader( waitForNewBlocksPeriod time.Duration, appender LogAppenderMap, adressessToQuery []common.Address, + rh *RetryHandler, ) (*EVMDownloader, error) { finality, err := blockFinalityType.ToBlockNum() if err != nil { @@ -59,6 +60,7 @@ func NewEVMDownloader( appender: appender, topicsToQuery: topicsToQuery, adressessToQuery: adressessToQuery, + rh: rh, }, }, nil } @@ -106,6 +108,7 @@ type downloaderImplementation struct { appender LogAppenderMap topicsToQuery [][]common.Hash adressessToQuery []common.Address + rh *RetryHandler } func (d *downloaderImplementation) waitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64) { @@ -122,7 +125,7 @@ func (d *downloaderImplementation) waitForNewBlocks(ctx context.Context, lastBlo if err != nil { attempts++ log.Error("error getting last block num from eth client: ", err) - RetryHandler("waitForNewBlocks", attempts) + d.rh.Handle("waitForNewBlocks", attempts) continue } if header.Number.Uint64() > lastBlockSeen { @@ -155,7 +158,7 @@ func (d *downloaderImplementation) getEventsByBlockRange(ctx context.Context, fr if err != nil { attempts++ log.Error("error trying to append log: ", err) - RetryHandler("getLogs", attempts) + d.rh.Handle("getLogs", attempts) continue } break @@ -178,7 +181,7 @@ func (d *downloaderImplementation) getLogs(ctx context.Context, fromBlock, toBlo if err != nil { attempts++ log.Error("error calling FilterLogs to eth client: ", err) - RetryHandler("getLogs", attempts) + d.rh.Handle("getLogs", attempts) continue } return logs @@ -192,7 +195,7 @@ func (d *downloaderImplementation) getBlockHeader(ctx context.Context, blockNum if err != nil { attempts++ log.Errorf("error getting block header for block %d, err: %v", blockNum, err) - RetryHandler("getBlockHeader", attempts) + d.rh.Handle("getBlockHeader", attempts) continue } return EVMBlockHeader{ diff --git a/sync/evmdownloader_test.go b/sync/evmdownloader_test.go index c733aabd..2f5a7ee5 100644 --- a/sync/evmdownloader_test.go +++ b/sync/evmdownloader_test.go @@ -320,7 +320,6 @@ func TestDownload(t *testing.T) { } func TestWaitForNewBlocks(t *testing.T) { - RetryAfterErrorPeriod = time.Millisecond * 100 ctx := context.Background() d, clientMock := NewTestDownloader(t) @@ -353,7 +352,6 @@ func TestWaitForNewBlocks(t *testing.T) { } func TestGetBlockHeader(t *testing.T) { - RetryAfterErrorPeriod = time.Millisecond * 100 ctx := context.Background() d, clientMock := NewTestDownloader(t) @@ -389,8 +387,12 @@ func buildAppender() LogAppenderMap { } func NewTestDownloader(t *testing.T) (*EVMDownloader, *L2Mock) { + rh := &RetryHandler{ + MaxRetryAttemptsAfterError: 5, + RetryAfterErrorPeriod: time.Millisecond * 100, + } clientMock := NewL2Mock(t) - d, err := NewEVMDownloader(clientMock, syncBlockChunck, etherman.LatestBlock, time.Millisecond, buildAppender(), []common.Address{contractAddr}) + d, err := NewEVMDownloader(clientMock, syncBlockChunck, etherman.LatestBlock, time.Millisecond, buildAppender(), []common.Address{contractAddr}, rh) require.NoError(t, err) return d, clientMock } diff --git a/sync/evmdriver.go b/sync/evmdriver.go index a30b96d6..0e20731f 100644 --- a/sync/evmdriver.go +++ b/sync/evmdriver.go @@ -20,6 +20,7 @@ type EVMDriver struct { downloader evmDownloaderFull reorgDetectorID string downloadBufferSize int + rh *RetryHandler } type processorInterface interface { @@ -39,6 +40,7 @@ func NewEVMDriver( downloader evmDownloaderFull, reorgDetectorID string, downloadBufferSize int, + rh *RetryHandler, ) (*EVMDriver, error) { reorgSub, err := reorgDetector.Subscribe(reorgDetectorID) if err != nil { @@ -51,6 +53,7 @@ func NewEVMDriver( downloader: downloader, reorgDetectorID: reorgDetectorID, downloadBufferSize: downloadBufferSize, + rh: rh, }, nil } @@ -66,7 +69,7 @@ reset: if err != nil { attempts++ log.Error("error geting last processed block: ", err) - RetryHandler("Sync", attempts) + d.rh.Handle("Sync", attempts) continue } break @@ -98,7 +101,7 @@ func (d *EVMDriver) handleNewBlock(ctx context.Context, b EVMBlock) { if err != nil { attempts++ log.Errorf("error adding block %d to tracker: %v", b.Num, err) - RetryHandler("handleNewBlock", attempts) + d.rh.Handle("handleNewBlock", attempts) continue } break @@ -113,7 +116,7 @@ func (d *EVMDriver) handleNewBlock(ctx context.Context, b EVMBlock) { if err != nil { attempts++ log.Errorf("error processing events for blcok %d, err: ", b.Num, err) - RetryHandler("handleNewBlock", attempts) + d.rh.Handle("handleNewBlock", attempts) continue } break @@ -139,7 +142,7 @@ func (d *EVMDriver) handleReorg( "error processing reorg, last valid Block %d, err: %v", firstReorgedBlock, err, ) - RetryHandler("handleReorg", attempts) + d.rh.Handle("handleReorg", attempts) continue } break diff --git a/sync/evmdriver_test.go b/sync/evmdriver_test.go index 502722f6..853dda81 100644 --- a/sync/evmdriver_test.go +++ b/sync/evmdriver_test.go @@ -19,7 +19,10 @@ var ( ) func TestSync(t *testing.T) { - RetryAfterErrorPeriod = time.Millisecond * 100 + rh := &RetryHandler{ + MaxRetryAttemptsAfterError: 5, + RetryAfterErrorPeriod: time.Millisecond * 100, + } rdm := NewReorgDetectorMock(t) pm := NewProcessorMock(t) dm := NewEVMDownloaderMock(t) @@ -29,7 +32,7 @@ func TestSync(t *testing.T) { FirstReorgedBlock: firstReorgedBlock, ReorgProcessed: reorgProcessed, }, nil) - driver, err := NewEVMDriver(rdm, pm, dm, reorgDetectorID, 10) + driver, err := NewEVMDriver(rdm, pm, dm, reorgDetectorID, 10, rh) require.NoError(t, err) ctx := context.Background() expectedBlock1 := EVMBlock{ @@ -107,12 +110,15 @@ func TestSync(t *testing.T) { } func TestHandleNewBlock(t *testing.T) { - RetryAfterErrorPeriod = time.Millisecond * 100 + rh := &RetryHandler{ + MaxRetryAttemptsAfterError: 5, + RetryAfterErrorPeriod: time.Millisecond * 100, + } rdm := NewReorgDetectorMock(t) pm := NewProcessorMock(t) dm := NewEVMDownloaderMock(t) rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{}, nil) - driver, err := NewEVMDriver(rdm, pm, dm, reorgDetectorID, 10) + driver, err := NewEVMDriver(rdm, pm, dm, reorgDetectorID, 10, rh) require.NoError(t, err) ctx := context.Background() @@ -166,7 +172,10 @@ func TestHandleNewBlock(t *testing.T) { } func TestHandleReorg(t *testing.T) { - RetryAfterErrorPeriod = time.Millisecond * 100 + rh := &RetryHandler{ + MaxRetryAttemptsAfterError: 5, + RetryAfterErrorPeriod: time.Millisecond * 100, + } rdm := NewReorgDetectorMock(t) pm := NewProcessorMock(t) dm := NewEVMDownloaderMock(t) @@ -174,7 +183,7 @@ func TestHandleReorg(t *testing.T) { rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{ ReorgProcessed: reorgProcessed, }, nil) - driver, err := NewEVMDriver(rdm, pm, dm, reorgDetectorID, 10) + driver, err := NewEVMDriver(rdm, pm, dm, reorgDetectorID, 10, rh) require.NoError(t, err) ctx := context.Background()