Skip to content

Commit

Permalink
Add PR review suggestions from joanestebanr
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaubennassar committed Aug 1, 2024
1 parent b1a399b commit 5f2c29f
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 60 deletions.
14 changes: 7 additions & 7 deletions aggoracle/chaingersender/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ type EVMChainGERSender struct {
}

type EVMConfig struct {
GlobalExitRootL2 common.Address `mapstructure:"GlobalExitRootL2"`
URLRPCL2 string `mapstructure:"URLRPCL2"`
ChainIDL2 uint64 `mapstructure:"ChainIDL2"`
GasOffset uint64 `mapstructure:"GasOffset"`
WaitPeriodMonitorTx cfgTypes.Duration `mapstructure:"WaitPeriodMonitorTx"`
SenderAddr common.Address `mapstructure:"SenderAddr"`
EthTxManager ethtxmanager.Config `mapstructure:"EthTxManager"`
GlobalExitRootL2Addr common.Address `mapstructure:"GlobalExitRootL2"`
URLRPCL2 string `mapstructure:"URLRPCL2"`
ChainIDL2 uint64 `mapstructure:"ChainIDL2"`
GasOffset uint64 `mapstructure:"GasOffset"`
WaitPeriodMonitorTx cfgTypes.Duration `mapstructure:"WaitPeriodMonitorTx"`
SenderAddr common.Address `mapstructure:"SenderAddr"`
EthTxManager ethtxmanager.Config `mapstructure:"EthTxManager"`
}

func NewEVMChainGERSender(
Expand Down
2 changes: 1 addition & 1 deletion aggoracle/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func commonSetup(t *testing.T) (
require.NoError(t, err)
// Syncer
dbPathSyncer := t.TempDir()
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerL1Addr, 10, etherman.LatestBlock, reorg, l1Client.Client(), time.Millisecond, 0)
syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerL1Addr, 10, etherman.LatestBlock, reorg, l1Client.Client(), time.Millisecond, 0, 100*time.Millisecond, 3)
require.NoError(t, err)
go syncer.Start(ctx)

Expand Down
4 changes: 2 additions & 2 deletions aggoracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ func (a *AggOracle) Start(ctx context.Context) {
log.Debugf("GER %s already injected", gerToInject.Hex())
continue
}
log.Debugf("injecting new GER: %s", gerToInject.Hex())
log.Infof("injecting new GER: %s", gerToInject.Hex())
if err := a.chainSender.UpdateGERWaitUntilMined(ctx, gerToInject); err != nil {
log.Errorf("error calling updateGERWaitUntilMined, when trying to inject GER %s: %v", gerToInject.Hex(), err)
continue
}
log.Debugf("GER %s injected", gerToInject.Hex())
log.Infof("GER %s injected", gerToInject.Hex())
case <-ctx.Done():
return
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func createAggoracle(cfg config.Config, l1Client *ethclient.Client, syncer *l1in
log.Fatal(err)
}
sender, err = chaingersender.NewEVMChainGERSender(
cfg.AggOracle.EVMSender.GlobalExitRootL2,
cfg.AggOracle.EVMSender.GlobalExitRootL2Addr,
cfg.AggOracle.EVMSender.SenderAddr,
l2CLient,
ethTxManager,
Expand Down Expand Up @@ -388,6 +388,8 @@ func newL1InfoTreeSyncer(
l1Client,
cfg.L1InfoTreeSync.WaitForNewBlocksPeriod.Duration,
cfg.L1InfoTreeSync.InitialBlock,
cfg.L1InfoTreeSync.RetryAfterErrorPeriod.Duration,
cfg.L1InfoTreeSync.MaxRetryAttemptsAfterError,
)
if err != nil {
log.Fatal(err)
Expand Down
45 changes: 45 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,49 @@ SequencerPrivateKey = {}
[Aggregator.Synchronizer.Etherman]
[Aggregator.Synchronizer.Etherman.Validium]
Enabled = false
[ReorgDetectorL1]
DBPath = "/tmp/reorgdetector"
[L1InfoTreeSync]
DBPath = "/tmp/L1InfoTreeSync"
GlobalExitRootAddr="0x8464135c8F25Da09e49BC8782676a84730C318bC"
SyncBlockChunkSize=10
BlockFinality="latest"
URLRPCL1="http://test-aggoracle-l1:8545"
WaitForNewBlocksPeriod="100ms"
InitialBlock=0
[AggOracle]
TargetChainType="EVM"
URLRPCL1="http://test-aggoracle-l1:8545"
BlockFinality="latest"
WaitPeriodNextGER="100ms"
[EVMSender]
GlobalExitRootL2="0x8464135c8F25Da09e49BC8782676a84730C318bC"
URLRPCL2="http://test-aggoracle-l2:8545"
ChainIDL2=1337
GasOffset=0
WaitPeriodMonitorTx="100ms"
SenderAddr="0x70997970c51812dc3a010c7d01b50e0d17dc79c8"
[SequenceSender.EthTxManager]
FrequencyToMonitorTxs = "1s"
WaitTxToBeMined = "2s"
GetReceiptMaxTime = "250ms"
GetReceiptWaitInterval = "1s"
PrivateKeys = [
{Path = "/app/keystore/aggoracle.keystore", Password = "testonly"},
]
ForcedGas = 0
GasPriceMarginFactor = 1
MaxGasPriceLimit = 0
PersistenceFilename = "/tmp/ethtxmanager.json"
ReadPendingL1Txs = false
SafeStatusL1NumberOfBlocks = 5
FinalizedStatusL1NumberOfBlocks = 10
[SequenceSender.EthTxManager.Etherman]
URL = "http://test-aggoracle-l2"
MultiGasProvider = false
L1ChainID = 1337
HTTPHeaders = []
`
2 changes: 1 addition & 1 deletion l1infotreesync/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestE2E(t *testing.T) {
rdm.On("AddBlockToTrack", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
client, gerAddr, gerSc, err := newSimulatedClient(auth)
require.NoError(t, err)
syncer, err := New(ctx, dbPath, gerAddr, 10, etherman.LatestBlock, rdm, client.Client(), time.Millisecond, 0)
syncer, err := New(ctx, dbPath, gerAddr, 10, etherman.LatestBlock, rdm, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 3)
require.NoError(t, err)
go syncer.Start(ctx)

Expand Down
28 changes: 16 additions & 12 deletions l1infotreesync/l1infotreesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,17 @@ const (
downloadBufferSize = 1000
)

var (
retryAfterErrorPeriod = time.Second * 10
maxRetryAttemptsAfterError = 5
)

type Config struct {
DBPath string `mapstructure:"DBPath"`
GlobalExitRootAddr common.Address `mapstructure:"GlobalExitRootAddr"`
SyncBlockChunkSize uint64 `mapstructure:"SyncBlockChunkSize"`
// TODO: BlockFinality doesnt work as per the jsonschema
BlockFinality string `jsonschema:"enum=latest,enum=safe, enum=pending, enum=finalized" mapstructure:"BlockFinality"`
URLRPCL1 string `mapstructure:"URLRPCL1"`
WaitForNewBlocksPeriod types.Duration `mapstructure:"WaitForNewBlocksPeriod"`
InitialBlock uint64 `mapstructure:"InitialBlock"`
BlockFinality string `jsonschema:"enum=latest,enum=safe, enum=pending, enum=finalized" mapstructure:"BlockFinality"`
URLRPCL1 string `mapstructure:"URLRPCL1"`
WaitForNewBlocksPeriod types.Duration `mapstructure:"WaitForNewBlocksPeriod"`
InitialBlock uint64 `mapstructure:"InitialBlock"`
RetryAfterErrorPeriod types.Duration `mapstructure:"RetryAfterErrorPeriod"`
MaxRetryAttemptsAfterError int `mapstructure:"MaxRetryAttemptsAfterError"`
}

type L1InfoTreeSync struct {
Expand All @@ -46,6 +43,8 @@ func New(
l1Client EthClienter,
waitForNewBlocksPeriod time.Duration,
initialBlock uint64,
retryAfterErrorPeriod time.Duration,
maxRetryAttemptsAfterError int,
) (*L1InfoTreeSync, error) {
processor, err := newProcessor(ctx, dbPath)
if err != nil {
Expand All @@ -56,14 +55,18 @@ func New(
if err != nil {
return nil, err
}
if lastProcessedBlock < initialBlock {
if initialBlock > 0 && lastProcessedBlock < initialBlock-1 {
err = processor.ProcessBlock(sync.Block{
Num: initialBlock,
Num: initialBlock - 1,
})
if err != nil {
return nil, err
}
}
rh := &sync.RetryHandler{
RetryAfterErrorPeriod: retryAfterErrorPeriod,
MaxRetryAttemptsAfterError: maxRetryAttemptsAfterError,
}

appender, err := buildAppender(l1Client, globalExitRoot)
if err != nil {
Expand All @@ -76,12 +79,13 @@ func New(
waitForNewBlocksPeriod,
appender,
[]common.Address{globalExitRoot},
rh,
)
if err != nil {
return nil, err
}

driver, err := sync.NewEVMDriver(rd, processor, downloader, reorgDetectorID, downloadBufferSize)
driver, err := sync.NewEVMDriver(rd, processor, downloader, reorgDetectorID, downloadBufferSize, rh)
if err != nil {
return nil, err
}
Expand Down
32 changes: 22 additions & 10 deletions l1infotreesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,13 @@ func newProcessor(ctx context.Context, dbPath string) (*processor, error) {
p := &processor{
db: db,
}
leaves, err := p.getAllLeavesHashed(ctx)

tx, err := p.db.BeginRo(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()
leaves, err := p.getAllLeavesHashed(tx)
if err != nil {
return nil, err
}
Expand All @@ -134,14 +140,8 @@ func newProcessor(ctx context.Context, dbPath string) (*processor, error) {
return p, nil
}

func (p *processor) getAllLeavesHashed(ctx context.Context) ([][32]byte, error) {
func (p *processor) getAllLeavesHashed(tx kv.Tx) ([][32]byte, error) {
// TODO: same coment about refactor that appears at ComputeMerkleProofByIndex
tx, err := p.db.BeginRo(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()

index, err := p.getLastIndex(tx)
if err == ErrNotFound || index == 0 {
return nil, nil
Expand Down Expand Up @@ -323,7 +323,6 @@ func (p *processor) getLastProcessedBlockWithTx(tx kv.Tx) (uint64, error) {
}

func (p *processor) Reorg(firstReorgedBlock uint64) error {
// TODO: Does tree need to be reorged?
tx, err := p.db.BeginRw(context.Background())
if err != nil {
return err
Expand Down Expand Up @@ -359,6 +358,17 @@ func (p *processor) Reorg(firstReorgedBlock uint64) error {
tx.Rollback()
return err
}
leaves, err := p.getAllLeavesHashed(tx)
if err != nil {
tx.Rollback()
return err
}
tree, err := l1infotree.NewL1InfoTree(treeHeight, leaves)
if err != nil {
tx.Rollback()
return err
}
p.tree = tree
return tx.Commit()
}

Expand Down Expand Up @@ -404,6 +414,7 @@ func (p *processor) ProcessBlock(b sync.Block) error {
if err != nil {
return err
}
events := make([]Event, len(b.Events))
if len(b.Events) > 0 {
var initialIndex uint32
lastIndex, err := p.getLastIndex(tx)
Expand All @@ -417,6 +428,7 @@ func (p *processor) ProcessBlock(b sync.Block) error {
}
for i, e := range b.Events {
event := e.(Event)
events = append(events, event)
leafToStore := storeLeaf{
Index: initialIndex + uint32(i),
MainnetExitRoot: event.MainnetExitRoot,
Expand Down Expand Up @@ -448,7 +460,7 @@ func (p *processor) ProcessBlock(b sync.Block) error {
tx.Rollback()
return err
}
log.Debugf("block %d processed with events: %+v", b.Num, b.Events)
log.Debugf("block %d processed with events: %+v", b.Num, events)
return tx.Commit()
}

Expand Down
7 changes: 6 additions & 1 deletion localbridgesync/localbridgesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func New(
return nil, err
}
}
rh := &sync.RetryHandler{
MaxRetryAttemptsAfterError: maxRetryAttemptsAfterError,
RetryAfterErrorPeriod: retryAfterErrorPeriod,
}

appender, err := buildAppender(l2Client, bridge)
if err != nil {
Expand All @@ -62,12 +66,13 @@ func New(
waitForNewBlocksPeriod,
appender,
[]common.Address{bridge},
rh,
)
if err != nil {
return nil, err
}

driver, err := sync.NewEVMDriver(rd, processor, downloader, reorgDetectorID, downloadBufferSize)
driver, err := sync.NewEVMDriver(rd, processor, downloader, reorgDetectorID, downloadBufferSize, rh)
if err != nil {
return nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions sync/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import (
"time"
)

var (
RetryAfterErrorPeriod = time.Second * 10
MaxRetryAttemptsAfterError = 5
)
type RetryHandler struct {
RetryAfterErrorPeriod time.Duration
MaxRetryAttemptsAfterError int
}

func RetryHandler(funcName string, attempts int) {
if attempts >= MaxRetryAttemptsAfterError {
func (h *RetryHandler) Handle(funcName string, attempts int) {
if h.MaxRetryAttemptsAfterError > -1 && attempts >= h.MaxRetryAttemptsAfterError {
log.Fatalf(
"%s failed too many times (%d)",
funcName, MaxRetryAttemptsAfterError,
funcName, h.MaxRetryAttemptsAfterError,
)
}
time.Sleep(RetryAfterErrorPeriod)
time.Sleep(h.RetryAfterErrorPeriod)
}
11 changes: 7 additions & 4 deletions sync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func NewEVMDownloader(
waitForNewBlocksPeriod time.Duration,
appender LogAppenderMap,
adressessToQuery []common.Address,
rh *RetryHandler,
) (*EVMDownloader, error) {
finality, err := blockFinalityType.ToBlockNum()
if err != nil {
Expand All @@ -59,6 +60,7 @@ func NewEVMDownloader(
appender: appender,
topicsToQuery: topicsToQuery,
adressessToQuery: adressessToQuery,
rh: rh,
},
}, nil
}
Expand Down Expand Up @@ -106,6 +108,7 @@ type downloaderImplementation struct {
appender LogAppenderMap
topicsToQuery [][]common.Hash
adressessToQuery []common.Address
rh *RetryHandler
}

func (d *downloaderImplementation) waitForNewBlocks(ctx context.Context, lastBlockSeen uint64) (newLastBlock uint64) {
Expand All @@ -122,7 +125,7 @@ func (d *downloaderImplementation) waitForNewBlocks(ctx context.Context, lastBlo
if err != nil {
attempts++
log.Error("error getting last block num from eth client: ", err)
RetryHandler("waitForNewBlocks", attempts)
d.rh.Handle("waitForNewBlocks", attempts)
continue
}
if header.Number.Uint64() > lastBlockSeen {
Expand Down Expand Up @@ -155,7 +158,7 @@ func (d *downloaderImplementation) getEventsByBlockRange(ctx context.Context, fr
if err != nil {
attempts++
log.Error("error trying to append log: ", err)
RetryHandler("getLogs", attempts)
d.rh.Handle("getLogs", attempts)
continue
}
break
Expand All @@ -178,7 +181,7 @@ func (d *downloaderImplementation) getLogs(ctx context.Context, fromBlock, toBlo
if err != nil {
attempts++
log.Error("error calling FilterLogs to eth client: ", err)
RetryHandler("getLogs", attempts)
d.rh.Handle("getLogs", attempts)
continue
}
return logs
Expand All @@ -192,7 +195,7 @@ func (d *downloaderImplementation) getBlockHeader(ctx context.Context, blockNum
if err != nil {
attempts++
log.Errorf("error getting block header for block %d, err: %v", blockNum, err)
RetryHandler("getBlockHeader", attempts)
d.rh.Handle("getBlockHeader", attempts)
continue
}
return EVMBlockHeader{
Expand Down
Loading

0 comments on commit 5f2c29f

Please sign in to comment.