diff --git a/bridgesync/e2e_test.go b/bridgesync/e2e_test.go index d733a53e..db8be873 100644 --- a/bridgesync/e2e_test.go +++ b/bridgesync/e2e_test.go @@ -51,7 +51,7 @@ func TestBridgeEventE2E(t *testing.T) { auth, err := bind.NewKeyedTransactorWithChainID(privateKey, big.NewInt(1337)) require.NoError(t, err) client, bridgeAddr, bridgeSc := newSimulatedClient(t, auth) - rd, err := reorgdetector.New(ctx, client.Client(), dbPathReorg) + rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg}) require.NoError(t, err) go rd.Start(ctx) diff --git a/bridgesync/mock_l2_test.go b/bridgesync/mock_l2_test.go index 8c37a56d..a8f33ef8 100644 --- a/bridgesync/mock_l2_test.go +++ b/bridgesync/mock_l2_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.22.1. DO NOT EDIT. +// Code generated by mockery v2.40.1. DO NOT EDIT. package bridgesync @@ -24,6 +24,10 @@ type L2Mock struct { func (_m *L2Mock) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { ret := _m.Called(ctx, hash) + if len(ret) == 0 { + panic("no return value specified for BlockByHash") + } + var r0 *types.Block var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.Block, error)); ok { @@ -50,6 +54,10 @@ func (_m *L2Mock) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo func (_m *L2Mock) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { ret := _m.Called(ctx, number) + if len(ret) == 0 { + panic("no return value specified for BlockByNumber") + } + var r0 *types.Block var r1 error if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Block, error)); ok { @@ -76,6 +84,10 @@ func (_m *L2Mock) BlockByNumber(ctx context.Context, number *big.Int) (*types.Bl func (_m *L2Mock) BlockNumber(ctx context.Context) (uint64, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for BlockNumber") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { @@ -100,6 +112,10 @@ func (_m *L2Mock) BlockNumber(ctx context.Context) (uint64, error) { func (_m *L2Mock) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { ret := _m.Called(ctx, call, blockNumber) + if len(ret) == 0 { + panic("no return value specified for CallContract") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, ethereum.CallMsg, *big.Int) ([]byte, error)); ok { @@ -126,6 +142,10 @@ func (_m *L2Mock) CallContract(ctx context.Context, call ethereum.CallMsg, block func (_m *L2Mock) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) { ret := _m.Called(ctx, contract, blockNumber) + if len(ret) == 0 { + panic("no return value specified for CodeAt") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) ([]byte, error)); ok { @@ -152,6 +172,10 @@ func (_m *L2Mock) CodeAt(ctx context.Context, contract common.Address, blockNumb func (_m *L2Mock) EstimateGas(ctx context.Context, call ethereum.CallMsg) (uint64, error) { ret := _m.Called(ctx, call) + if len(ret) == 0 { + panic("no return value specified for EstimateGas") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(context.Context, ethereum.CallMsg) (uint64, error)); ok { @@ -176,6 +200,10 @@ func (_m *L2Mock) EstimateGas(ctx context.Context, call ethereum.CallMsg) (uint6 func (_m *L2Mock) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { ret := _m.Called(ctx, q) + if len(ret) == 0 { + panic("no return value specified for FilterLogs") + } + var r0 []types.Log var r1 error if rf, ok := ret.Get(0).(func(context.Context, ethereum.FilterQuery) ([]types.Log, error)); ok { @@ -202,6 +230,10 @@ func (_m *L2Mock) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]typ func (_m *L2Mock) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { ret := _m.Called(ctx, hash) + if len(ret) == 0 { + panic("no return value specified for HeaderByHash") + } + var r0 *types.Header var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.Header, error)); ok { @@ -228,6 +260,10 @@ func (_m *L2Mock) HeaderByHash(ctx context.Context, hash common.Hash) (*types.He func (_m *L2Mock) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { ret := _m.Called(ctx, number) + if len(ret) == 0 { + panic("no return value specified for HeaderByNumber") + } + var r0 *types.Header var r1 error if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Header, error)); ok { @@ -254,6 +290,10 @@ func (_m *L2Mock) HeaderByNumber(ctx context.Context, number *big.Int) (*types.H func (_m *L2Mock) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) { ret := _m.Called(ctx, account) + if len(ret) == 0 { + panic("no return value specified for PendingCodeAt") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Address) ([]byte, error)); ok { @@ -280,6 +320,10 @@ func (_m *L2Mock) PendingCodeAt(ctx context.Context, account common.Address) ([] func (_m *L2Mock) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { ret := _m.Called(ctx, account) + if len(ret) == 0 { + panic("no return value specified for PendingNonceAt") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Address) (uint64, error)); ok { @@ -304,6 +348,10 @@ func (_m *L2Mock) PendingNonceAt(ctx context.Context, account common.Address) (u func (_m *L2Mock) SendTransaction(ctx context.Context, tx *types.Transaction) error { ret := _m.Called(ctx, tx) + if len(ret) == 0 { + panic("no return value specified for SendTransaction") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *types.Transaction) error); ok { r0 = rf(ctx, tx) @@ -318,6 +366,10 @@ func (_m *L2Mock) SendTransaction(ctx context.Context, tx *types.Transaction) er func (_m *L2Mock) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { ret := _m.Called(ctx, q, ch) + if len(ret) == 0 { + panic("no return value specified for SubscribeFilterLogs") + } + var r0 ethereum.Subscription var r1 error if rf, ok := ret.Get(0).(func(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error)); ok { @@ -344,6 +396,10 @@ func (_m *L2Mock) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuer func (_m *L2Mock) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { ret := _m.Called(ctx, ch) + if len(ret) == 0 { + panic("no return value specified for SubscribeNewHead") + } + var r0 ethereum.Subscription var r1 error if rf, ok := ret.Get(0).(func(context.Context, chan<- *types.Header) (ethereum.Subscription, error)); ok { @@ -370,6 +426,10 @@ func (_m *L2Mock) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) func (_m *L2Mock) SuggestGasPrice(ctx context.Context) (*big.Int, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for SuggestGasPrice") + } + var r0 *big.Int var r1 error if rf, ok := ret.Get(0).(func(context.Context) (*big.Int, error)); ok { @@ -396,6 +456,10 @@ func (_m *L2Mock) SuggestGasPrice(ctx context.Context) (*big.Int, error) { func (_m *L2Mock) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for SuggestGasTipCap") + } + var r0 *big.Int var r1 error if rf, ok := ret.Get(0).(func(context.Context) (*big.Int, error)); ok { @@ -422,6 +486,10 @@ func (_m *L2Mock) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { func (_m *L2Mock) TransactionCount(ctx context.Context, blockHash common.Hash) (uint, error) { ret := _m.Called(ctx, blockHash) + if len(ret) == 0 { + panic("no return value specified for TransactionCount") + } + var r0 uint var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (uint, error)); ok { @@ -446,6 +514,10 @@ func (_m *L2Mock) TransactionCount(ctx context.Context, blockHash common.Hash) ( func (_m *L2Mock) TransactionInBlock(ctx context.Context, blockHash common.Hash, index uint) (*types.Transaction, error) { ret := _m.Called(ctx, blockHash, index) + if len(ret) == 0 { + panic("no return value specified for TransactionInBlock") + } + var r0 *types.Transaction var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Hash, uint) (*types.Transaction, error)); ok { @@ -468,13 +540,12 @@ func (_m *L2Mock) TransactionInBlock(ctx context.Context, blockHash common.Hash, return r0, r1 } -type mockConstructorTestingTNewL2Mock interface { +// NewL2Mock creates a new instance of L2Mock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewL2Mock(t interface { mock.TestingT Cleanup(func()) -} - -// NewL2Mock creates a new instance of L2Mock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewL2Mock(t mockConstructorTestingTNewL2Mock) *L2Mock { +}) *L2Mock { mock := &L2Mock{} mock.Mock.Test(t) diff --git a/cmd/run.go b/cmd/run.go index c17c4676..4b47c254 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -62,8 +62,8 @@ func start(cliCtx *cli.Context) error { components := cliCtx.StringSlice(config.FlagComponents) l1Client := runL1ClientIfNeeded(components, c.Etherman.URL) l2Client := runL2ClientIfNeeded(components, c.AggOracle.EVMSender.URLRPCL2) - reorgDetectorL1 := runReorgDetectorL1IfNeeded(cliCtx.Context, components, l1Client, c.ReorgDetectorL1.DBPath) - reorgDetectorL2 := runReorgDetectorL2IfNeeded(cliCtx.Context, components, l2Client, c.ReorgDetectorL2.DBPath) + reorgDetectorL1 := runReorgDetectorL1IfNeeded(cliCtx.Context, components, l1Client, &c.ReorgDetectorL1) + reorgDetectorL2 := runReorgDetectorL2IfNeeded(cliCtx.Context, components, l2Client, &c.ReorgDetectorL2) l1InfoTreeSync := runL1InfoTreeSyncerIfNeeded(cliCtx.Context, components, *c, l1Client, reorgDetectorL1) claimSponsor := runClaimSponsorIfNeeded(cliCtx.Context, components, l2Client, c.ClaimSponsor) l1BridgeSync := runBridgeSyncL1IfNeeded(cliCtx.Context, components, c.BridgeL1Sync, reorgDetectorL1, l1Client) @@ -408,11 +408,10 @@ func newState(c *config.Config, l2ChainID uint64, sqlDB *pgxpool.Pool) *state.St } func newReorgDetector( - ctx context.Context, - dbPath string, + cfg *reorgdetector.Config, client *ethclient.Client, ) *reorgdetector.ReorgDetector { - rd, err := reorgdetector.New(ctx, client, dbPath) + rd, err := reorgdetector.New(client, *cfg) if err != nil { log.Fatal(err) } @@ -485,20 +484,20 @@ func runL2ClientIfNeeded(components []string, urlRPCL2 string) *ethclient.Client return l2CLient } -func runReorgDetectorL1IfNeeded(ctx context.Context, components []string, l1Client *ethclient.Client, dbPath string) *reorgdetector.ReorgDetector { +func runReorgDetectorL1IfNeeded(ctx context.Context, components []string, l1Client *ethclient.Client, cfg *reorgdetector.Config) *reorgdetector.ReorgDetector { if !isNeeded([]string{SEQUENCE_SENDER, AGGREGATOR, AGGORACLE, RPC}, components) { return nil } - rd := newReorgDetector(ctx, dbPath, l1Client) + rd := newReorgDetector(cfg, l1Client) go rd.Start(ctx) return rd } -func runReorgDetectorL2IfNeeded(ctx context.Context, components []string, l2Client *ethclient.Client, dbPath string) *reorgdetector.ReorgDetector { +func runReorgDetectorL2IfNeeded(ctx context.Context, components []string, l2Client *ethclient.Client, cfg *reorgdetector.Config) *reorgdetector.ReorgDetector { if !isNeeded([]string{AGGORACLE, RPC}, components) { return nil } - rd := newReorgDetector(ctx, dbPath, l2Client) + rd := newReorgDetector(cfg, l2Client) go rd.Start(ctx) return rd } diff --git a/dataavailability/mocks_da/batch_data_provider.go b/dataavailability/mocks_da/batch_data_provider.go index 6d44a550..36e782ac 100644 --- a/dataavailability/mocks_da/batch_data_provider.go +++ b/dataavailability/mocks_da/batch_data_provider.go @@ -25,6 +25,10 @@ func (_m *BatchDataProvider) EXPECT() *BatchDataProvider_Expecter { func (_m *BatchDataProvider) GetBatchL2Data(batchNum []uint64, batchHashes []common.Hash, dataAvailabilityMessage []byte) ([][]byte, error) { ret := _m.Called(batchNum, batchHashes, dataAvailabilityMessage) + if len(ret) == 0 { + panic("no return value specified for GetBatchL2Data") + } + var r0 [][]byte var r1 error if rf, ok := ret.Get(0).(func([]uint64, []common.Hash, []byte) ([][]byte, error)); ok { @@ -77,13 +81,12 @@ func (_c *BatchDataProvider_GetBatchL2Data_Call) RunAndReturn(run func([]uint64, return _c } -type mockConstructorTestingTNewBatchDataProvider interface { +// NewBatchDataProvider creates a new instance of BatchDataProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBatchDataProvider(t interface { mock.TestingT Cleanup(func()) -} - -// NewBatchDataProvider creates a new instance of BatchDataProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewBatchDataProvider(t mockConstructorTestingTNewBatchDataProvider) *BatchDataProvider { +}) *BatchDataProvider { mock := &BatchDataProvider{} mock.Mock.Test(t) diff --git a/dataavailability/mocks_da/da_backender.go b/dataavailability/mocks_da/da_backender.go index 770faaa0..773e447c 100644 --- a/dataavailability/mocks_da/da_backender.go +++ b/dataavailability/mocks_da/da_backender.go @@ -29,6 +29,10 @@ func (_m *DABackender) EXPECT() *DABackender_Expecter { func (_m *DABackender) GetSequence(ctx context.Context, batchHashes []common.Hash, dataAvailabilityMessage []byte) ([][]byte, error) { ret := _m.Called(ctx, batchHashes, dataAvailabilityMessage) + if len(ret) == 0 { + panic("no return value specified for GetSequence") + } + var r0 [][]byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, []common.Hash, []byte) ([][]byte, error)); ok { @@ -85,6 +89,10 @@ func (_c *DABackender_GetSequence_Call) RunAndReturn(run func(context.Context, [ func (_m *DABackender) Init() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Init") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -126,6 +134,10 @@ func (_c *DABackender_Init_Call) RunAndReturn(run func() error) *DABackender_Ini func (_m *DABackender) PostSequenceBanana(ctx context.Context, sequence etherman.SequenceBanana) ([]byte, error) { ret := _m.Called(ctx, sequence) + if len(ret) == 0 { + panic("no return value specified for PostSequenceBanana") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, etherman.SequenceBanana) ([]byte, error)); ok { @@ -181,6 +193,10 @@ func (_c *DABackender_PostSequenceBanana_Call) RunAndReturn(run func(context.Con func (_m *DABackender) PostSequenceElderberry(ctx context.Context, batchesData [][]byte) ([]byte, error) { ret := _m.Called(ctx, batchesData) + if len(ret) == 0 { + panic("no return value specified for PostSequenceElderberry") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, [][]byte) ([]byte, error)); ok { @@ -232,13 +248,12 @@ func (_c *DABackender_PostSequenceElderberry_Call) RunAndReturn(run func(context return _c } -type mockConstructorTestingTNewDABackender interface { +// NewDABackender creates a new instance of DABackender. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDABackender(t interface { mock.TestingT Cleanup(func()) -} - -// NewDABackender creates a new instance of DABackender. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewDABackender(t mockConstructorTestingTNewDABackender) *DABackender { +}) *DABackender { mock := &DABackender{} mock.Mock.Test(t) diff --git a/dataavailability/mocks_da/data_manager.go b/dataavailability/mocks_da/data_manager.go index 30edf358..34345d71 100644 --- a/dataavailability/mocks_da/data_manager.go +++ b/dataavailability/mocks_da/data_manager.go @@ -29,6 +29,10 @@ func (_m *DataManager) EXPECT() *DataManager_Expecter { func (_m *DataManager) GetBatchL2Data(batchNum []uint64, batchHashes []common.Hash, dataAvailabilityMessage []byte) ([][]byte, error) { ret := _m.Called(batchNum, batchHashes, dataAvailabilityMessage) + if len(ret) == 0 { + panic("no return value specified for GetBatchL2Data") + } + var r0 [][]byte var r1 error if rf, ok := ret.Get(0).(func([]uint64, []common.Hash, []byte) ([][]byte, error)); ok { @@ -85,6 +89,10 @@ func (_c *DataManager_GetBatchL2Data_Call) RunAndReturn(run func([]uint64, []com func (_m *DataManager) PostSequenceBanana(ctx context.Context, sequence etherman.SequenceBanana) ([]byte, error) { ret := _m.Called(ctx, sequence) + if len(ret) == 0 { + panic("no return value specified for PostSequenceBanana") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, etherman.SequenceBanana) ([]byte, error)); ok { @@ -140,6 +148,10 @@ func (_c *DataManager_PostSequenceBanana_Call) RunAndReturn(run func(context.Con func (_m *DataManager) PostSequenceElderberry(ctx context.Context, batchesData [][]byte) ([]byte, error) { ret := _m.Called(ctx, batchesData) + if len(ret) == 0 { + panic("no return value specified for PostSequenceElderberry") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, [][]byte) ([]byte, error)); ok { @@ -191,13 +203,12 @@ func (_c *DataManager_PostSequenceElderberry_Call) RunAndReturn(run func(context return _c } -type mockConstructorTestingTNewDataManager interface { +// NewDataManager creates a new instance of DataManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDataManager(t interface { mock.TestingT Cleanup(func()) -} - -// NewDataManager creates a new instance of DataManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewDataManager(t mockConstructorTestingTNewDataManager) *DataManager { +}) *DataManager { mock := &DataManager{} mock.Mock.Test(t) diff --git a/dataavailability/mocks_da/func_sign_type.go b/dataavailability/mocks_da/func_sign_type.go index 8d3171dc..6a343269 100644 --- a/dataavailability/mocks_da/func_sign_type.go +++ b/dataavailability/mocks_da/func_sign_type.go @@ -25,6 +25,10 @@ func (_m *FuncSignType) EXPECT() *FuncSignType_Expecter { func (_m *FuncSignType) Execute(c client.Client) ([]byte, error) { ret := _m.Called(c) + if len(ret) == 0 { + panic("no return value specified for Execute") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(client.Client) ([]byte, error)); ok { @@ -75,13 +79,12 @@ func (_c *FuncSignType_Execute_Call) RunAndReturn(run func(client.Client) ([]byt return _c } -type mockConstructorTestingTNewFuncSignType interface { +// NewFuncSignType creates a new instance of FuncSignType. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewFuncSignType(t interface { mock.TestingT Cleanup(func()) -} - -// NewFuncSignType creates a new instance of FuncSignType. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewFuncSignType(t mockConstructorTestingTNewFuncSignType) *FuncSignType { +}) *FuncSignType { mock := &FuncSignType{} mock.Mock.Test(t) diff --git a/dataavailability/mocks_da/sequence_retriever.go b/dataavailability/mocks_da/sequence_retriever.go index 27212656..f82d9a70 100644 --- a/dataavailability/mocks_da/sequence_retriever.go +++ b/dataavailability/mocks_da/sequence_retriever.go @@ -27,6 +27,10 @@ func (_m *SequenceRetriever) EXPECT() *SequenceRetriever_Expecter { func (_m *SequenceRetriever) GetSequence(ctx context.Context, batchHashes []common.Hash, dataAvailabilityMessage []byte) ([][]byte, error) { ret := _m.Called(ctx, batchHashes, dataAvailabilityMessage) + if len(ret) == 0 { + panic("no return value specified for GetSequence") + } + var r0 [][]byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, []common.Hash, []byte) ([][]byte, error)); ok { @@ -79,13 +83,12 @@ func (_c *SequenceRetriever_GetSequence_Call) RunAndReturn(run func(context.Cont return _c } -type mockConstructorTestingTNewSequenceRetriever interface { +// NewSequenceRetriever creates a new instance of SequenceRetriever. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewSequenceRetriever(t interface { mock.TestingT Cleanup(func()) -} - -// NewSequenceRetriever creates a new instance of SequenceRetriever. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewSequenceRetriever(t mockConstructorTestingTNewSequenceRetriever) *SequenceRetriever { +}) *SequenceRetriever { mock := &SequenceRetriever{} mock.Mock.Test(t) diff --git a/dataavailability/mocks_da/sequence_sender.go b/dataavailability/mocks_da/sequence_sender.go index be13b43b..f1e44741 100644 --- a/dataavailability/mocks_da/sequence_sender.go +++ b/dataavailability/mocks_da/sequence_sender.go @@ -27,6 +27,10 @@ func (_m *SequenceSender) EXPECT() *SequenceSender_Expecter { func (_m *SequenceSender) PostSequenceBanana(ctx context.Context, sequence etherman.SequenceBanana) ([]byte, error) { ret := _m.Called(ctx, sequence) + if len(ret) == 0 { + panic("no return value specified for PostSequenceBanana") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, etherman.SequenceBanana) ([]byte, error)); ok { @@ -82,6 +86,10 @@ func (_c *SequenceSender_PostSequenceBanana_Call) RunAndReturn(run func(context. func (_m *SequenceSender) PostSequenceElderberry(ctx context.Context, batchesData [][]byte) ([]byte, error) { ret := _m.Called(ctx, batchesData) + if len(ret) == 0 { + panic("no return value specified for PostSequenceElderberry") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, [][]byte) ([]byte, error)); ok { @@ -133,13 +141,12 @@ func (_c *SequenceSender_PostSequenceElderberry_Call) RunAndReturn(run func(cont return _c } -type mockConstructorTestingTNewSequenceSender interface { +// NewSequenceSender creates a new instance of SequenceSender. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewSequenceSender(t interface { mock.TestingT Cleanup(func()) -} - -// NewSequenceSender creates a new instance of SequenceSender. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewSequenceSender(t mockConstructorTestingTNewSequenceSender) *SequenceSender { +}) *SequenceSender { mock := &SequenceSender{} mock.Mock.Test(t) diff --git a/dataavailability/mocks_da/sequence_sender_banana.go b/dataavailability/mocks_da/sequence_sender_banana.go index faa8ba86..aca7b1a3 100644 --- a/dataavailability/mocks_da/sequence_sender_banana.go +++ b/dataavailability/mocks_da/sequence_sender_banana.go @@ -27,6 +27,10 @@ func (_m *SequenceSenderBanana) EXPECT() *SequenceSenderBanana_Expecter { func (_m *SequenceSenderBanana) PostSequenceBanana(ctx context.Context, sequence etherman.SequenceBanana) ([]byte, error) { ret := _m.Called(ctx, sequence) + if len(ret) == 0 { + panic("no return value specified for PostSequenceBanana") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, etherman.SequenceBanana) ([]byte, error)); ok { @@ -78,13 +82,12 @@ func (_c *SequenceSenderBanana_PostSequenceBanana_Call) RunAndReturn(run func(co return _c } -type mockConstructorTestingTNewSequenceSenderBanana interface { +// NewSequenceSenderBanana creates a new instance of SequenceSenderBanana. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewSequenceSenderBanana(t interface { mock.TestingT Cleanup(func()) -} - -// NewSequenceSenderBanana creates a new instance of SequenceSenderBanana. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewSequenceSenderBanana(t mockConstructorTestingTNewSequenceSenderBanana) *SequenceSenderBanana { +}) *SequenceSenderBanana { mock := &SequenceSenderBanana{} mock.Mock.Test(t) diff --git a/dataavailability/mocks_da/sequence_sender_elderberry.go b/dataavailability/mocks_da/sequence_sender_elderberry.go index d877a8bc..3816fa1b 100644 --- a/dataavailability/mocks_da/sequence_sender_elderberry.go +++ b/dataavailability/mocks_da/sequence_sender_elderberry.go @@ -25,6 +25,10 @@ func (_m *SequenceSenderElderberry) EXPECT() *SequenceSenderElderberry_Expecter func (_m *SequenceSenderElderberry) PostSequenceElderberry(ctx context.Context, batchesData [][]byte) ([]byte, error) { ret := _m.Called(ctx, batchesData) + if len(ret) == 0 { + panic("no return value specified for PostSequenceElderberry") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, [][]byte) ([]byte, error)); ok { @@ -76,13 +80,12 @@ func (_c *SequenceSenderElderberry_PostSequenceElderberry_Call) RunAndReturn(run return _c } -type mockConstructorTestingTNewSequenceSenderElderberry interface { +// NewSequenceSenderElderberry creates a new instance of SequenceSenderElderberry. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewSequenceSenderElderberry(t interface { mock.TestingT Cleanup(func()) -} - -// NewSequenceSenderElderberry creates a new instance of SequenceSenderElderberry. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewSequenceSenderElderberry(t mockConstructorTestingTNewSequenceSenderElderberry) *SequenceSenderElderberry { +}) *SequenceSenderElderberry { mock := &SequenceSenderElderberry{} mock.Mock.Test(t) diff --git a/l1bridge2infoindexsync/e2e_test.go b/l1bridge2infoindexsync/e2e_test.go index deb613f3..c2a5e982 100644 --- a/l1bridge2infoindexsync/e2e_test.go +++ b/l1bridge2infoindexsync/e2e_test.go @@ -12,6 +12,7 @@ import ( "github.com/0xPolygon/cdk-contracts-tooling/contracts/elderberry-paris/polygonzkevmbridgev2" "github.com/0xPolygon/cdk-contracts-tooling/contracts/elderberry-paris/polygonzkevmglobalexitrootv2" "github.com/0xPolygon/cdk/bridgesync" + cdktypes "github.com/0xPolygon/cdk/config/types" "github.com/0xPolygon/cdk/etherman" "github.com/0xPolygon/cdk/l1bridge2infoindexsync" "github.com/0xPolygon/cdk/l1infotreesync" @@ -132,8 +133,9 @@ func TestE2E(t *testing.T) { require.NotEqual(t, authDeployer.From, auth.From) client, gerAddr, bridgeAddr, gerSc, bridgeSc, err := newSimulatedClient(authDeployer, auth) require.NoError(t, err) - rd, err := reorgdetector.New(ctx, client.Client(), dbPathReorg) - go rd.Start(ctx) + rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Second)}) + require.NoError(t, err) + require.NoError(t, rd.Start(ctx)) bridgeSync, err := bridgesync.NewL1(ctx, dbPathBridgeSync, bridgeAddr, 10, etherman.LatestBlock, rd, client.Client(), 0, time.Millisecond*10, 0, 0) require.NoError(t, err) @@ -204,7 +206,7 @@ func TestE2E(t *testing.T) { syncerUpToDate = true break } - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 100) errMsg = fmt.Sprintf("last block from client: %d, last block from syncer: %d", lb.NumberU64(), lpb) } require.True(t, syncerUpToDate, errMsg) diff --git a/l1infotreesync/e2e_test.go b/l1infotreesync/e2e_test.go index c1b16446..562f0e39 100644 --- a/l1infotreesync/e2e_test.go +++ b/l1infotreesync/e2e_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/0xPolygon/cdk-contracts-tooling/contracts/banana-paris/polygonzkevmglobalexitrootv2" + cdktypes "github.com/0xPolygon/cdk/config/types" "github.com/0xPolygon/cdk/etherman" "github.com/0xPolygon/cdk/l1infotreesync" "github.com/0xPolygon/cdk/reorgdetector" @@ -181,8 +182,9 @@ func TestStressAndReorgs(t *testing.T) { require.NoError(t, err) client, gerAddr, verifyAddr, gerSc, verifySC, err := newSimulatedClient(auth) require.NoError(t, err) - rd, err := reorgdetector.New(ctx, client.Client(), dbPathReorg) - go rd.Start(ctx) + rd, err := reorgdetector.New(client.Client(), reorgdetector.Config{DBPath: dbPathReorg, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}) + require.NoError(t, err) + require.NoError(t, rd.Start(ctx)) syncer, err := l1infotreesync.New(ctx, dbPathSyncer, gerAddr, verifyAddr, 10, etherman.LatestBlock, rd, client.Client(), time.Millisecond, 0, 100*time.Millisecond, 3) require.NoError(t, err) go syncer.Start(ctx) diff --git a/l1infotreesync/mock_reorgdetector_test.go b/l1infotreesync/mock_reorgdetector_test.go index 22d174d4..8255443e 100644 --- a/l1infotreesync/mock_reorgdetector_test.go +++ b/l1infotreesync/mock_reorgdetector_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.22.1. DO NOT EDIT. +// Code generated by mockery v2.40.1. DO NOT EDIT. package l1infotreesync @@ -21,6 +21,10 @@ type ReorgDetectorMock struct { func (_m *ReorgDetectorMock) AddBlockToTrack(ctx context.Context, id string, blockNum uint64, blockHash common.Hash) error { ret := _m.Called(ctx, id, blockNum, blockHash) + if len(ret) == 0 { + panic("no return value specified for AddBlockToTrack") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, uint64, common.Hash) error); ok { r0 = rf(ctx, id, blockNum, blockHash) @@ -35,6 +39,10 @@ func (_m *ReorgDetectorMock) AddBlockToTrack(ctx context.Context, id string, blo func (_m *ReorgDetectorMock) Subscribe(id string) (*reorgdetector.Subscription, error) { ret := _m.Called(id) + if len(ret) == 0 { + panic("no return value specified for Subscribe") + } + var r0 *reorgdetector.Subscription var r1 error if rf, ok := ret.Get(0).(func(string) (*reorgdetector.Subscription, error)); ok { @@ -57,13 +65,12 @@ func (_m *ReorgDetectorMock) Subscribe(id string) (*reorgdetector.Subscription, return r0, r1 } -type mockConstructorTestingTNewReorgDetectorMock interface { +// NewReorgDetectorMock creates a new instance of ReorgDetectorMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReorgDetectorMock(t interface { mock.TestingT Cleanup(func()) -} - -// NewReorgDetectorMock creates a new instance of ReorgDetectorMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewReorgDetectorMock(t mockConstructorTestingTNewReorgDetectorMock) *ReorgDetectorMock { +}) *ReorgDetectorMock { mock := &ReorgDetectorMock{} mock.Mock.Test(t) diff --git a/reorgdetector/config.go b/reorgdetector/config.go new file mode 100644 index 00000000..c9a90415 --- /dev/null +++ b/reorgdetector/config.go @@ -0,0 +1,29 @@ +package reorgdetector + +import ( + "time" + + "github.com/0xPolygon/cdk/config/types" +) + +const ( + defaultCheckReorgsInterval = 2 * time.Second +) + +// Config is the configuration for the reorg detector +type Config struct { + // DBPath is the path to the database + DBPath string `mapstructure:"DBPath"` + + // CheckReorgsInterval is the interval to check for reorgs in tracked blocks + CheckReorgsInterval types.Duration `mapstructure:"CheckReorgsInterval"` +} + +// GetCheckReorgsInterval returns the interval to check for reorgs in tracked blocks +func (c *Config) GetCheckReorgsInterval() time.Duration { + if c.CheckReorgsInterval.Duration == 0 { + return defaultCheckReorgsInterval + } + + return c.CheckReorgsInterval.Duration +} diff --git a/reorgdetector/mock_eth_client.go b/reorgdetector/mock_eth_client.go index 85376cc4..a76c62f9 100644 --- a/reorgdetector/mock_eth_client.go +++ b/reorgdetector/mock_eth_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.22.1. DO NOT EDIT. +// Code generated by mockery v2.40.1. DO NOT EDIT. package reorgdetector @@ -6,6 +6,10 @@ import ( context "context" big "math/big" + common "github.com/ethereum/go-ethereum/common" + + ethereum "github.com/ethereum/go-ethereum" + mock "github.com/stretchr/testify/mock" types "github.com/ethereum/go-ethereum/core/types" @@ -16,23 +20,29 @@ type EthClientMock struct { mock.Mock } -// BlockNumber provides a mock function with given fields: ctx -func (_m *EthClientMock) BlockNumber(ctx context.Context) (uint64, error) { - ret := _m.Called(ctx) +// HeaderByHash provides a mock function with given fields: ctx, hash +func (_m *EthClientMock) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + ret := _m.Called(ctx, hash) + + if len(ret) == 0 { + panic("no return value specified for HeaderByHash") + } - var r0 uint64 + var r0 *types.Header var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { - return rf(ctx) + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.Header, error)); ok { + return rf(ctx, hash) } - if rf, ok := ret.Get(0).(func(context.Context) uint64); ok { - r0 = rf(ctx) + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *types.Header); ok { + r0 = rf(ctx, hash) } else { - r0 = ret.Get(0).(uint64) + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Header) + } } - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) + if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { + r1 = rf(ctx, hash) } else { r1 = ret.Error(1) } @@ -44,6 +54,10 @@ func (_m *EthClientMock) BlockNumber(ctx context.Context) (uint64, error) { func (_m *EthClientMock) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { ret := _m.Called(ctx, number) + if len(ret) == 0 { + panic("no return value specified for HeaderByNumber") + } + var r0 *types.Header var r1 error if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Header, error)); ok { @@ -66,13 +80,42 @@ func (_m *EthClientMock) HeaderByNumber(ctx context.Context, number *big.Int) (* return r0, r1 } -type mockConstructorTestingTNewEthClientMock interface { - mock.TestingT - Cleanup(func()) +// SubscribeNewHead provides a mock function with given fields: ctx, ch +func (_m *EthClientMock) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { + ret := _m.Called(ctx, ch) + + if len(ret) == 0 { + panic("no return value specified for SubscribeNewHead") + } + + var r0 ethereum.Subscription + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, chan<- *types.Header) (ethereum.Subscription, error)); ok { + return rf(ctx, ch) + } + if rf, ok := ret.Get(0).(func(context.Context, chan<- *types.Header) ethereum.Subscription); ok { + r0 = rf(ctx, ch) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(ethereum.Subscription) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, chan<- *types.Header) error); ok { + r1 = rf(ctx, ch) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // NewEthClientMock creates a new instance of EthClientMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewEthClientMock(t mockConstructorTestingTNewEthClientMock) *EthClientMock { +// The first argument is typically a *testing.T value. +func NewEthClientMock(t interface { + mock.TestingT + Cleanup(func()) +}) *EthClientMock { mock := &EthClientMock{} mock.Mock.Test(t) diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 9eb631aa..22c4693e 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -2,592 +2,195 @@ package reorgdetector import ( "context" - "encoding/json" - "errors" + "fmt" "math/big" - "sort" "sync" "time" "github.com/0xPolygon/cdk/log" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" + "golang.org/x/sync/errgroup" ) -// TODO: consider the case where blocks can disappear, current implementation assumes that if there is a reorg, -// the client will have at least as many blocks as it had before the reorg, however this may not be the case for L2 - -const ( - defaultWaitPeriodBlockRemover = time.Second * 20 - defaultWaitPeriodBlockAdder = time.Second * 2 // should be smaller than block time of the tracked chain - - subscriberBlocks = "reorgdetector-subscriberBlocks" - - unfinalisedBlocksID = "unfinalisedBlocks" -) - -var ( - ErrNotSubscribed = errors.New("id not found in subscriptions") - ErrInvalidBlockHash = errors.New("the block hash does not match with the expected block hash") - ErrIDReserverd = errors.New("subscription id is reserved") -) - -func tableCfgFunc(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - subscriberBlocks: {}, - } -} - type EthClient interface { + SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) + HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) - BlockNumber(ctx context.Context) (uint64, error) -} - -type block struct { - Num uint64 - Hash common.Hash -} - -type blockMap map[uint64]block - -// newBlockMap returns a new instance of blockMap -func newBlockMap(blocks ...block) blockMap { - blockMap := make(blockMap, len(blocks)) - - for _, b := range blocks { - blockMap[b.Num] = b - } - - return blockMap -} - -// getSorted returns blocks in sorted order -func (bm blockMap) getSorted() []block { - sortedBlocks := make([]block, 0, len(bm)) - - for _, b := range bm { - sortedBlocks = append(sortedBlocks, b) - } - - sort.Slice(sortedBlocks, func(i, j int) bool { - return sortedBlocks[i].Num < sortedBlocks[j].Num - }) - - return sortedBlocks -} - -// getFromBlockSorted returns blocks from blockNum in sorted order without including the blockNum -func (bm blockMap) getFromBlockSorted(blockNum uint64) []block { - sortedBlocks := bm.getSorted() - - index := -1 - for i, b := range sortedBlocks { - if b.Num > blockNum { - index = i - break - } - } - - if index == -1 { - return []block{} - } - - return sortedBlocks[index:] -} - -// getClosestHigherBlock returns the closest higher block to the given blockNum -func (bm blockMap) getClosestHigherBlock(blockNum uint64) (block, bool) { - if block, ok := bm[blockNum]; ok { - return block, true - } - - sorted := bm.getFromBlockSorted(blockNum) - if len(sorted) == 0 { - return block{}, false - } - - return sorted[0], true -} - -// removeRange removes blocks from from to to -func (bm blockMap) removeRange(from, to uint64) { - for i := from; i <= to; i++ { - delete(bm, i) - } -} - -type Subscription struct { - FirstReorgedBlock chan uint64 - ReorgProcessed chan bool - pendingReorgsToBeProcessed sync.WaitGroup } type ReorgDetector struct { - ethClient EthClient - - subscriptionsLock sync.RWMutex - subscriptions map[string]*Subscription + client EthClient + db kv.RwDB + checkReorgInterval time.Duration trackedBlocksLock sync.RWMutex - trackedBlocks map[string]blockMap - - db kv.RwDB - - waitPeriodBlockRemover time.Duration - waitPeriodBlockAdder time.Duration -} + trackedBlocks map[string]*headersList -type Config struct { - DBPath string `mapstructure:"DBPath"` + subscriptionsLock sync.RWMutex + subscriptions map[string]*Subscription } -// New creates a new instance of ReorgDetector -func New(ctx context.Context, client EthClient, dbPath string) (*ReorgDetector, error) { +func New(client EthClient, cfg Config) (*ReorgDetector, error) { db, err := mdbx.NewMDBX(nil). - Path(dbPath). + Path(cfg.DBPath). WithTableCfg(tableCfgFunc). Open() if err != nil { - return nil, err - } - - return newReorgDetector(ctx, client, db) -} - -// newReorgDetector creates a new instance of ReorgDetector -func newReorgDetector(ctx context.Context, client EthClient, db kv.RwDB) (*ReorgDetector, error) { - return newReorgDetectorWithPeriods(ctx, client, db, defaultWaitPeriodBlockRemover, defaultWaitPeriodBlockAdder) -} - -// newReorgDetectorWithPeriods creates a new instance of ReorgDetector with custom wait periods -func newReorgDetectorWithPeriods(ctx context.Context, client EthClient, db kv.RwDB, - waitPeriodBlockRemover, waitPeriodBlockAdder time.Duration) (*ReorgDetector, error) { - r := &ReorgDetector{ - ethClient: client, - db: db, - subscriptions: make(map[string]*Subscription, 0), - waitPeriodBlockRemover: waitPeriodBlockRemover, - waitPeriodBlockAdder: waitPeriodBlockAdder, - } - - trackedBlocks, err := r.getTrackedBlocks(ctx) - if err != nil { - return nil, err - } - - r.trackedBlocks = trackedBlocks - - lastFinalisedBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) - if err != nil { - return nil, err - } - - if err = r.cleanStoredSubsBeforeStart(ctx, lastFinalisedBlock.Number.Uint64()); err != nil { - return nil, err - } - - return r, nil -} + return nil, fmt.Errorf("failed to open db: %w", err) + } + + return &ReorgDetector{ + client: client, + db: db, + checkReorgInterval: cfg.GetCheckReorgsInterval(), + trackedBlocks: make(map[string]*headersList), + subscriptions: make(map[string]*Subscription), + }, nil +} + +// Start starts the reorg detector +func (rd *ReorgDetector) Start(ctx context.Context) (err error) { + // Load tracked blocks from the DB + if err = rd.loadTrackedHeaders(ctx); err != nil { + return fmt.Errorf("failed to load tracked headers: %w", err) + } + + // Continuously check reorgs in tracked by subscribers blocks + go func() { + ticker := time.NewTicker(rd.checkReorgInterval) + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + if err = rd.detectReorgInTrackedList(ctx); err != nil { + log.Errorf("failed to detect reorg in tracked list: %v", err) + } + } + } + }() -func (r *ReorgDetector) Start(ctx context.Context) { - go r.removeFinalisedBlocks(ctx) - go r.addUnfinalisedBlocks(ctx) + return nil } -func (r *ReorgDetector) Subscribe(id string) (*Subscription, error) { - if id == unfinalisedBlocksID { - return nil, ErrIDReserverd +// AddBlockToTrack adds a block to the tracked list for a subscriber +func (rd *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, num uint64, hash common.Hash) error { + // Skip if the given block has already been stored + rd.trackedBlocksLock.RLock() + trackedBlocks, ok := rd.trackedBlocks[id] + if !ok { + rd.trackedBlocksLock.RUnlock() + return fmt.Errorf("subscriber %s is not subscribed", id) } + rd.trackedBlocksLock.RUnlock() - r.subscriptionsLock.Lock() - defer r.subscriptionsLock.Unlock() - - if sub, ok := r.subscriptions[id]; ok { - return sub, nil + if existingHeader := trackedBlocks.get(num); existingHeader != nil && existingHeader.Hash == hash { + return nil } - sub := &Subscription{ - FirstReorgedBlock: make(chan uint64), - ReorgProcessed: make(chan bool), - } - r.subscriptions[id] = sub - - return sub, nil -} - -func (r *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, blockNum uint64, blockHash common.Hash) error { - return nil - // COMENTING THE CODE AS I'M SUSPECTING A DEATHLOCK - // r.subscriptionsLock.RLock() - // if sub, ok := r.subscriptions[id]; !ok { - // r.subscriptionsLock.RUnlock() - // return ErrNotSubscribed - // } else { - // // In case there are reorgs being processed, wait - // // Note that this also makes any addition to trackedBlocks[id] safe - // sub.pendingReorgsToBeProcessed.Wait() - // } - - // r.subscriptionsLock.RUnlock() - - // if actualHash, ok := r.getUnfinalisedBlocksMap()[blockNum]; ok { - // if actualHash.Hash == blockHash { - // return r.saveTrackedBlock(ctx, id, block{Num: blockNum, Hash: blockHash}) - // } else { - // return ErrInvalidBlockHash - // } - // } else { - // // ReorgDetector has not added the requested block yet, - // // so we add it to the unfinalised blocks and then to the subscriber blocks as well - // block := block{Num: blockNum, Hash: blockHash} - // if err := r.saveTrackedBlock(ctx, unfinalisedBlocksID, block); err != nil { - // return err - // } - - // return r.saveTrackedBlock(ctx, id, block) - // } -} - -func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFinalisedBlock uint64) error { - blocksGotten := make(map[uint64]common.Hash, 0) - - r.trackedBlocksLock.Lock() - defer r.trackedBlocksLock.Unlock() - - for id, blocks := range r.trackedBlocks { - r.subscriptionsLock.Lock() - r.subscriptions[id] = &Subscription{ - FirstReorgedBlock: make(chan uint64), - ReorgProcessed: make(chan bool), - } - r.subscriptionsLock.Unlock() - - var ( - lastTrackedBlock uint64 - block block - actualBlockHash common.Hash - ok bool - ) - - if len(blocks) == 0 { - continue // nothing to process for this subscriber - } - - sortedBlocks := blocks.getSorted() - lastTrackedBlock = sortedBlocks[len(blocks)-1].Num - - for _, block = range sortedBlocks { - if actualBlockHash, ok = blocksGotten[block.Num]; !ok { - actualBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(block.Num))) - if err != nil { - return err - } - - actualBlockHash = actualBlock.Hash() - } - - if actualBlockHash != block.Hash { - // reorg detected, notify subscriber - go r.notifyReorgToSubscription(id, block.Num) - - // remove the reorged blocks from the tracked blocks - blocks.removeRange(block.Num, lastTrackedBlock) - - break - } else if block.Num <= latestFinalisedBlock { - delete(blocks, block.Num) - } - } - // if we processed finalized or reorged blocks, update the tracked blocks in memory and db - if err := r.updateTrackedBlocksNoLock(ctx, id, blocks); err != nil { - return err - } + // Store the given header to the tracked list + hdr := newHeader(num, hash) + if err := rd.saveTrackedBlock(ctx, id, hdr); err != nil { + return fmt.Errorf("failed to save tracked block: %w", err) } return nil } -func (r *ReorgDetector) removeFinalisedBlocks(ctx context.Context) { - ticker := time.NewTicker(r.waitPeriodBlockRemover) - - for { - select { - case <-ticker.C: - lastFinalisedBlock, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) - if err != nil { - log.Error("reorg detector - error getting last finalised block", "err", err) - - continue - } - - if err := r.removeTrackedBlocks(ctx, lastFinalisedBlock.Number.Uint64()); err != nil { - log.Error("reorg detector - error removing tracked blocks", "err", err) - - continue - } - case <-ctx.Done(): - return - } +// detectReorgInTrackedList detects reorgs in the tracked blocks. +// Notifies subscribers if reorg has happened +func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error { + // Get the latest finalized block + lastFinalisedBlock, err := rd.client.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) + if err != nil { + return fmt.Errorf("failed to get the latest finalized block: %w", err) } -} -func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) { var ( - lastUnfinalisedBlock uint64 - ticker = time.NewTicker(r.waitPeriodBlockAdder) - unfinalisedBlocksMap = r.getUnfinalisedBlocksMap() - prevBlock *types.Header - lastBlockFromClient *types.Header - err error + headersCacheLock sync.Mutex + headersCache = map[uint64]*types.Header{ + lastFinalisedBlock.Number.Uint64(): lastFinalisedBlock, + } + errGroup errgroup.Group ) - if len(unfinalisedBlocksMap) > 0 { - lastUnfinalisedBlock = unfinalisedBlocksMap.getSorted()[len(unfinalisedBlocksMap)-1].Num - } - - for { - select { - case <-ticker.C: - lastBlockFromClient, err = r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.LatestBlockNumber))) - if err != nil { - log.Error("reorg detector - error getting last block from client", "err", err) - continue - } - - if lastBlockFromClient.Number.Uint64() < lastUnfinalisedBlock { - // a reorg probably happened, and the client has less blocks than we have - // we should wait for the client to catch up so we can be sure - continue - } - - unfinalisedBlocksMap = r.getUnfinalisedBlocksMap() - if len(unfinalisedBlocksMap) == 0 { - // no unfinalised blocks, just add this block to the map - if err := r.saveTrackedBlock(ctx, unfinalisedBlocksID, block{ - Num: lastBlockFromClient.Number.Uint64(), - Hash: lastBlockFromClient.Hash(), - }); err != nil { - log.Error("reorg detector - error saving unfinalised block", "block", lastBlockFromClient.Number.Uint64(), "err", err) + rd.trackedBlocksLock.Lock() + defer rd.trackedBlocksLock.Unlock() + + for id, hdrs := range rd.trackedBlocks { + id := id + hdrs := hdrs + + errGroup.Go(func() error { + headers := hdrs.getSorted() + for _, hdr := range headers { + // Get the actual header from the network or from the cache + headersCacheLock.Lock() + currentHeader, ok := headersCache[hdr.Num] + if !ok || currentHeader == nil { + if currentHeader, err = rd.client.HeaderByNumber(ctx, big.NewInt(int64(hdr.Num))); err != nil { + headersCacheLock.Unlock() + return fmt.Errorf("failed to get the header: %w", err) + } + headersCache[hdr.Num] = currentHeader } - - continue - } - - startBlock := lastBlockFromClient - unfinalisedBlocksSorted := unfinalisedBlocksMap.getSorted() - reorgBlock := uint64(0) - - for i := startBlock.Number.Uint64(); i > unfinalisedBlocksSorted[0].Num; i-- { - previousBlock, ok := unfinalisedBlocksMap[i-1] - if !ok { - prevBlock, err = r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(i-1))) - if err != nil { - log.Error("reorg detector - error getting previous block", "block", i-1, "err", err) - break // stop processing blocks, and we will try to detect it in the next iteration + headersCacheLock.Unlock() + + // Check if the block hash matches with the actual block hash + if hdr.Hash == currentHeader.Hash() { + // Delete block from the tracked blocks list if it is less than or equal to the last finalized block + // and hashes matches. If higher than finalized block, we assume a reorg still might happen. + if hdr.Num <= lastFinalisedBlock.Number.Uint64() { + hdrs.removeRange(hdr.Num, hdr.Num) } - previousBlock = block{Num: prevBlock.Number.Uint64(), Hash: prevBlock.Hash()} + continue } - if previousBlock.Hash == lastBlockFromClient.ParentHash { - unfinalisedBlocksMap[i] = block{Num: lastBlockFromClient.Number.Uint64(), Hash: lastBlockFromClient.Hash()} - } else if previousBlock.Hash != lastBlockFromClient.ParentHash { - // reorg happened, we will find out from where exactly and report this to subscribers - reorgBlock = i - } + // Notify the subscriber about the reorg + rd.notifySubscriber(id, hdr) - lastBlockFromClient, err = r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(i-1))) - if err != nil { - log.Error("reorg detector - error getting last block from client", "err", err) - break // stop processing blocks, and we will try to detect it in the next iteration - } - } + // Remove the reorged block and all the following blocks + hdrs.removeRange(hdr.Num, headers[len(headers)-1].Num) - if err == nil { - // if we noticed an error, do not notify or update tracked blocks - if reorgBlock > 0 { - r.notifyReorgToAllSubscriptions(reorgBlock) - } else { - r.updateTrackedBlocks(ctx, unfinalisedBlocksID, unfinalisedBlocksMap) - } + break } - case <-ctx.Done(): - return - } - } -} - -func (r *ReorgDetector) notifyReorgToAllSubscriptions(reorgBlock uint64) { - r.subscriptionsLock.RLock() - defer r.subscriptionsLock.RUnlock() - - for id, sub := range r.subscriptions { - r.trackedBlocksLock.RLock() - subscriberBlocks := r.trackedBlocks[id] - r.trackedBlocksLock.RUnlock() - - closestBlock, exists := subscriberBlocks.getClosestHigherBlock(reorgBlock) - - if exists { - go r.notifyReorgToSub(sub, closestBlock.Num) - // remove reorged blocks from tracked blocks - sorted := subscriberBlocks.getSorted() - subscriberBlocks.removeRange(closestBlock.Num, sorted[len(sorted)-1].Num) - if err := r.updateTrackedBlocks(context.Background(), id, subscriberBlocks); err != nil { - log.Error("reorg detector - error updating tracked blocks", "err", err) + // Update the tracked blocks in the DB + if err := rd.updateTrackedBlocksDB(ctx, id, hdrs); err != nil { + return fmt.Errorf("failed to update tracked blocks for subscriber %s: %w", id, err) } - } - } -} - -func (r *ReorgDetector) notifyReorgToSubscription(id string, reorgBlock uint64) { - if id == unfinalisedBlocksID { - // unfinalised blocks are not subscribers, and reorg block should be > 0 - return - } - - r.subscriptionsLock.RLock() - sub := r.subscriptions[id] - r.subscriptionsLock.RUnlock() - - r.notifyReorgToSub(sub, reorgBlock) -} - -func (r *ReorgDetector) notifyReorgToSub(sub *Subscription, reorgBlock uint64) { - sub.pendingReorgsToBeProcessed.Add(1) - - // notify about the first reorged block that was tracked - // and wait for the receiver to process - sub.FirstReorgedBlock <- reorgBlock - <-sub.ReorgProcessed - - sub.pendingReorgsToBeProcessed.Done() -} -// getUnfinalisedBlocksMap returns the map of unfinalised blocks -func (r *ReorgDetector) getUnfinalisedBlocksMap() blockMap { - r.trackedBlocksLock.RLock() - defer r.trackedBlocksLock.RUnlock() - - return r.trackedBlocks[unfinalisedBlocksID] -} - -// getTrackedBlocks returns a list of tracked blocks for each subscriber from db -func (r *ReorgDetector) getTrackedBlocks(ctx context.Context) (map[string]blockMap, error) { - tx, err := r.db.BeginRo(ctx) - if err != nil { - return nil, err - } - - defer tx.Rollback() - - cursor, err := tx.Cursor(subscriberBlocks) - if err != nil { - return nil, err - } - - defer cursor.Close() - - trackedBlocks := make(map[string]blockMap, 0) - - for k, v, err := cursor.First(); k != nil; k, v, err = cursor.Next() { - if err != nil { - return nil, err - } - - var blocks []block - if err := json.Unmarshal(v, &blocks); err != nil { - return nil, err - } - - trackedBlocks[string(k)] = newBlockMap(blocks...) - } - - if _, ok := trackedBlocks[unfinalisedBlocksID]; !ok { - // add unfinalised blocks to tracked blocks map if not present in db - trackedBlocks[unfinalisedBlocksID] = newBlockMap() + return nil + }) } - return trackedBlocks, nil + return errGroup.Wait() } -// saveTrackedBlock saves the tracked block for a subscriber in db and in memory -func (r *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b block) error { - tx, err := r.db.BeginRw(ctx) - if err != nil { - return err - } - - defer tx.Rollback() - - r.trackedBlocksLock.Lock() - - subscriberBlockMap, ok := r.trackedBlocks[id] - if !ok { - subscriberBlockMap = newBlockMap(b) - r.trackedBlocks[id] = subscriberBlockMap - } - - r.trackedBlocksLock.Unlock() - - raw, err := json.Marshal(subscriberBlockMap.getSorted()) - if err != nil { +// loadTrackedHeaders loads tracked headers from the DB and stores them in memory +func (rd *ReorgDetector) loadTrackedHeaders(ctx context.Context) (err error) { + rd.trackedBlocksLock.Lock() + defer rd.trackedBlocksLock.Unlock() - return err + // Load tracked blocks for all subscribers from the DB + if rd.trackedBlocks, err = rd.getTrackedBlocks(ctx); err != nil { + return fmt.Errorf("failed to get tracked blocks: %w", err) } - return tx.Put(subscriberBlocks, []byte(id), raw) -} - -// removeTrackedBlocks removes the tracked blocks for a subscriber in db and in memory -func (r *ReorgDetector) removeTrackedBlocks(ctx context.Context, lastFinalizedBlock uint64) error { - r.subscriptionsLock.RLock() - defer r.subscriptionsLock.RUnlock() - - for id := range r.subscriptions { - r.trackedBlocksLock.RLock() - newTrackedBlocks := r.trackedBlocks[id].getFromBlockSorted(lastFinalizedBlock) - r.trackedBlocksLock.RUnlock() - - if err := r.updateTrackedBlocks(ctx, id, newBlockMap(newTrackedBlocks...)); err != nil { - return err + // Go over tracked blocks and create subscription for each tracker + for id := range rd.trackedBlocks { + rd.subscriptions[id] = &Subscription{ + ReorgedBlock: make(chan uint64), + ReorgProcessed: make(chan bool), } } return nil } - -// updateTrackedBlocks updates the tracked blocks for a subscriber in db and in memory -func (r *ReorgDetector) updateTrackedBlocks(ctx context.Context, id string, blocks blockMap) error { - r.trackedBlocksLock.Lock() - defer r.trackedBlocksLock.Unlock() - - return r.updateTrackedBlocksNoLock(ctx, id, blocks) -} - -// updateTrackedBlocksNoLock updates the tracked blocks for a subscriber in db and in memory -func (r *ReorgDetector) updateTrackedBlocksNoLock(ctx context.Context, id string, blocks blockMap) error { - tx, err := r.db.BeginRw(ctx) - if err != nil { - return err - } - - defer tx.Rollback() - - raw, err := json.Marshal(blocks.getSorted()) - if err != nil { - return err - } - - if err := tx.Put(subscriberBlocks, []byte(id), raw); err != nil { - return err - } - - r.trackedBlocks[id] = blocks - - return nil -} diff --git a/reorgdetector/reorgdetector_db.go b/reorgdetector/reorgdetector_db.go new file mode 100644 index 00000000..3174cbc0 --- /dev/null +++ b/reorgdetector/reorgdetector_db.go @@ -0,0 +1,100 @@ +package reorgdetector + +import ( + "context" + "encoding/json" + + "github.com/ledgerwatch/erigon-lib/kv" +) + +const ( + subscriberBlocks = "reorgdetector-subscriberBlocks" +) + +func tableCfgFunc(_ kv.TableCfg) kv.TableCfg { + return kv.TableCfg{ + subscriberBlocks: {}, + } +} + +// getTrackedBlocks returns a list of tracked blocks for each subscriber from db +func (rd *ReorgDetector) getTrackedBlocks(ctx context.Context) (map[string]*headersList, error) { + tx, err := rd.db.BeginRo(ctx) + if err != nil { + return nil, err + } + + defer tx.Rollback() + + cursor, err := tx.Cursor(subscriberBlocks) + if err != nil { + return nil, err + } + + defer cursor.Close() + + trackedBlocks := make(map[string]*headersList, 0) + + for k, v, err := cursor.First(); k != nil; k, v, err = cursor.Next() { + if err != nil { + return nil, err + } + + var headers []header + if err := json.Unmarshal(v, &headers); err != nil { + return nil, err + } + + trackedBlocks[string(k)] = newHeadersList(headers...) + } + + return trackedBlocks, nil +} + +// saveTrackedBlock saves the tracked block for a subscriber in db and in memory +func (rd *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b header) error { + tx, err := rd.db.BeginRw(ctx) + if err != nil { + return err + } + + defer tx.Rollback() + + rd.trackedBlocksLock.Lock() + hdrs, ok := rd.trackedBlocks[id] + if !ok || hdrs.isEmpty() { + hdrs = newHeadersList(b) + rd.trackedBlocks[id] = hdrs + } else { + hdrs.add(b) + } + rd.trackedBlocksLock.Unlock() + + raw, err := json.Marshal(hdrs.getSorted()) + if err != nil { + return err + } + + return tx.Put(subscriberBlocks, []byte(id), raw) +} + +// updateTrackedBlocksDB updates the tracked blocks for a subscriber in db +func (rd *ReorgDetector) updateTrackedBlocksDB(ctx context.Context, id string, blocks *headersList) error { + tx, err := rd.db.BeginRw(ctx) + if err != nil { + return err + } + + defer tx.Rollback() + + raw, err := json.Marshal(blocks.getSorted()) + if err != nil { + return err + } + + if err = tx.Put(subscriberBlocks, []byte(id), raw); err != nil { + return err + } + + return nil +} diff --git a/reorgdetector/reorgdetector_sub.go b/reorgdetector/reorgdetector_sub.go new file mode 100644 index 00000000..675a81c5 --- /dev/null +++ b/reorgdetector/reorgdetector_sub.go @@ -0,0 +1,42 @@ +package reorgdetector + +// Subscription is a subscription to reorg events +type Subscription struct { + ReorgedBlock chan uint64 + ReorgProcessed chan bool +} + +// Subscribe subscribes to reorg events +func (rd *ReorgDetector) Subscribe(id string) (*Subscription, error) { + rd.subscriptionsLock.Lock() + defer rd.subscriptionsLock.Unlock() + + if sub, ok := rd.subscriptions[id]; ok { + return sub, nil + } + + // Create a new subscription + sub := &Subscription{ + ReorgedBlock: make(chan uint64), + ReorgProcessed: make(chan bool), + } + rd.subscriptions[id] = sub + + // Create a new tracked blocks list for the subscriber + rd.trackedBlocksLock.Lock() + rd.trackedBlocks[id] = newHeadersList() + rd.trackedBlocksLock.Unlock() + + return sub, nil +} + +// notifySubscriber notifies the subscriber with the block of the reorg +func (rd *ReorgDetector) notifySubscriber(id string, startingBlock header) { + // Notify subscriber about this particular reorg + rd.subscriptionsLock.RLock() + if sub, ok := rd.subscriptions[id]; ok { + sub.ReorgedBlock <- startingBlock.Num + <-sub.ReorgProcessed + } + rd.subscriptionsLock.RUnlock() +} diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index 275f89a8..7adec4ca 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -1,467 +1,97 @@ package reorgdetector -/* import ( "context" - "encoding/json" - "errors" - "fmt" big "math/big" - "os" - "reflect" "testing" "time" + cdktypes "github.com/0xPolygon/cdk/config/types" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" - types "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rpc" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient/simulated" "github.com/stretchr/testify/require" ) +func newSimulatedL1(t *testing.T, auth *bind.TransactOpts) *simulated.Backend { + t.Helper() -const testSubscriber = "testSubscriber" - -// newTestDB creates new instance of db used by tests. -func newTestDB(tb testing.TB) kv.RwDB { - tb.Helper() - - dir := fmt.Sprintf("/tmp/reorgdetector-temp_%v", time.Now().UTC().Format(time.RFC3339Nano)) - err := os.Mkdir(dir, 0775) - - require.NoError(tb, err) - - db, err := mdbx.NewMDBX(nil). - Path(dir). - WithTableCfg(tableCfgFunc). - Open() - require.NoError(tb, err) - - tb.Cleanup(func() { - require.NoError(tb, os.RemoveAll(dir)) - }) - - return db -} - -func TestBlockMap(t *testing.T) { - t.Parallel() - - // Create a new block map - bm := newBlockMap( - block{Num: 1, Hash: common.HexToHash("0x123")}, - block{Num: 2, Hash: common.HexToHash("0x456")}, - block{Num: 3, Hash: common.HexToHash("0x789")}, - ) - - t.Run("getSorted", func(t *testing.T) { - t.Parallel() - - sortedBlocks := bm.getSorted() - expectedSortedBlocks := []block{ - {Num: 1, Hash: common.HexToHash("0x123")}, - {Num: 2, Hash: common.HexToHash("0x456")}, - {Num: 3, Hash: common.HexToHash("0x789")}, - } - if !reflect.DeepEqual(sortedBlocks, expectedSortedBlocks) { - t.Errorf("getSorted() returned incorrect result, expected: %v, got: %v", expectedSortedBlocks, sortedBlocks) - } - }) - - t.Run("getFromBlockSorted", func(t *testing.T) { - t.Parallel() - - fromBlockSorted := bm.getFromBlockSorted(2) - expectedFromBlockSorted := []block{ - {Num: 3, Hash: common.HexToHash("0x789")}, - } - if !reflect.DeepEqual(fromBlockSorted, expectedFromBlockSorted) { - t.Errorf("getFromBlockSorted() returned incorrect result, expected: %v, got: %v", expectedFromBlockSorted, fromBlockSorted) - } - - // Test getFromBlockSorted function when blockNum is greater than the last block - fromBlockSorted = bm.getFromBlockSorted(4) - expectedFromBlockSorted = []block{} - if !reflect.DeepEqual(fromBlockSorted, expectedFromBlockSorted) { - t.Errorf("getFromBlockSorted() returned incorrect result, expected: %v, got: %v", expectedFromBlockSorted, fromBlockSorted) - } - }) - - t.Run("getClosestHigherBlock", func(t *testing.T) { - t.Parallel() - - bm := newBlockMap( - block{Num: 1, Hash: common.HexToHash("0x123")}, - block{Num: 2, Hash: common.HexToHash("0x456")}, - block{Num: 3, Hash: common.HexToHash("0x789")}, - ) - - // Test when the blockNum exists in the block map - b, exists := bm.getClosestHigherBlock(2) - require.True(t, exists) - expectedBlock := block{Num: 2, Hash: common.HexToHash("0x456")} - if b != expectedBlock { - t.Errorf("getClosestHigherBlock() returned incorrect result, expected: %v, got: %v", expectedBlock, b) - } - - // Test when the blockNum does not exist in the block map - b, exists = bm.getClosestHigherBlock(4) - require.False(t, exists) - expectedBlock = block{Num: 0, Hash: common.Hash{}} - if b != expectedBlock { - t.Errorf("getClosestHigherBlock() returned incorrect result, expected: %v, got: %v", expectedBlock, b) - } - }) - - t.Run("removeRange", func(t *testing.T) { - t.Parallel() - - bm := newBlockMap( - block{Num: 1, Hash: common.HexToHash("0x123")}, - block{Num: 2, Hash: common.HexToHash("0x456")}, - block{Num: 3, Hash: common.HexToHash("0x789")}, - block{Num: 4, Hash: common.HexToHash("0xabc")}, - block{Num: 5, Hash: common.HexToHash("0xdef")}, - ) - - bm.removeRange(3, 5) - - expectedBlocks := []block{ - {Num: 1, Hash: common.HexToHash("0x123")}, - {Num: 2, Hash: common.HexToHash("0x456")}, - } - - sortedBlocks := bm.getSorted() - - if !reflect.DeepEqual(sortedBlocks, expectedBlocks) { - t.Errorf("removeRange() failed, expected: %v, got: %v", expectedBlocks, sortedBlocks) - } - }) -} - -func TestReorgDetector_New(t *testing.T) { - t.Parallel() - - ctx := context.Background() - - t.Run("first initialization, no data", func(t *testing.T) { - t.Parallel() - - client := NewEthClientMock(t) - db := newTestDB(t) - - client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( - &types.Header{Number: big.NewInt(100)}, nil, - ) - - rd, err := newReorgDetector(ctx, client, db) - require.NoError(t, err) - require.Len(t, rd.trackedBlocks, 1) - - unfinalisedBlocksMap, exists := rd.trackedBlocks[unfinalisedBlocksID] - require.True(t, exists) - require.Empty(t, unfinalisedBlocksMap) - }) - - t.Run("getting last finalized block failed", func(t *testing.T) { - t.Parallel() - - client := NewEthClientMock(t) - db := newTestDB(t) - - expectedErr := errors.New("some error") - client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return(nil, expectedErr) - - _, err := newReorgDetector(ctx, client, db) - require.ErrorIs(t, err, expectedErr) - }) - - t.Run("have tracked blocks and subscriptions no reorg - all blocks finalized", func(t *testing.T) { - t.Parallel() - - client := NewEthClientMock(t) - db := newTestDB(t) - - testBlocks := createTestBlocks(t, 1, 6) - unfinalisedBlocks := testBlocks[:5] - testSubscriberBlocks := testBlocks[:3] - - insertTestData(t, ctx, db, unfinalisedBlocks, unfinalisedBlocksID) - insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) - - for _, block := range unfinalisedBlocks { - client.On("HeaderByNumber", ctx, block.Number).Return( - block, nil, - ) - } - - client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( - testBlocks[len(testBlocks)-1], nil, - ) - - rd, err := newReorgDetector(ctx, client, db) - require.NoError(t, err) - require.Len(t, rd.trackedBlocks, 2) // testSubscriber and unfinalisedBlocks - - unfinalisedBlocksMap, exists := rd.trackedBlocks[unfinalisedBlocksID] - require.True(t, exists) - require.Len(t, unfinalisedBlocksMap, 0) // since all blocks are finalized - - testSubscriberMap, exists := rd.trackedBlocks[testSubscriber] - require.True(t, exists) - require.Len(t, testSubscriberMap, 0) // since all blocks are finalized - }) - - t.Run("have tracked blocks and subscriptions no reorg - not all blocks finalized", func(t *testing.T) { - t.Parallel() - - client := NewEthClientMock(t) - db := newTestDB(t) - - testBlocks := createTestBlocks(t, 1, 7) - unfinalisedBlocks := testBlocks[:6] - testSubscriberBlocks := testBlocks[:4] - - insertTestData(t, ctx, db, unfinalisedBlocks, unfinalisedBlocksID) - insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) - - for _, block := range unfinalisedBlocks { - client.On("HeaderByNumber", ctx, block.Number).Return( - block, nil, - ) - } - - client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( - testSubscriberBlocks[len(testSubscriberBlocks)-1], nil, - ) - - rd, err := newReorgDetector(ctx, client, db) - require.NoError(t, err) - require.Len(t, rd.trackedBlocks, 2) // testSubscriber and unfinalisedBlocks - - unfinalisedBlocksMap, exists := rd.trackedBlocks[unfinalisedBlocksID] - require.True(t, exists) - require.Len(t, unfinalisedBlocksMap, len(unfinalisedBlocks)-len(testSubscriberBlocks)) // since all blocks are finalized - - testSubscriberMap, exists := rd.trackedBlocks[testSubscriber] - require.True(t, exists) - require.Len(t, testSubscriberMap, 0) // since all blocks are finalized - }) - - t.Run("have tracked blocks and subscriptions - reorg happened", func(t *testing.T) { - t.Parallel() - - client := NewEthClientMock(t) - db := newTestDB(t) - - trackedBlocks := createTestBlocks(t, 1, 5) - testSubscriberBlocks := trackedBlocks[:5] - - insertTestData(t, ctx, db, nil, unfinalisedBlocksID) // no unfinalised blocks - insertTestData(t, ctx, db, testSubscriberBlocks, testSubscriber) - - for _, block := range trackedBlocks[:3] { - client.On("HeaderByNumber", ctx, block.Number).Return( - block, nil, - ) - } - - reorgedBlocks := createTestBlocks(t, 4, 2) // block 4, and 5 are reorged - reorgedBlocks[0].ParentHash = trackedBlocks[2].Hash() // block 4 is reorged but his parent is block 3 - reorgedBlocks[1].ParentHash = reorgedBlocks[0].Hash() // block 5 is reorged but his parent is block 4 - - client.On("HeaderByNumber", ctx, reorgedBlocks[0].Number).Return( - reorgedBlocks[0], nil, - ) - - client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( - reorgedBlocks[len(reorgedBlocks)-1], nil, - ) - - rd, err := newReorgDetector(ctx, client, db) - require.NoError(t, err) - require.Len(t, rd.trackedBlocks, 2) // testSubscriber and unfinalisedBlocks + balance, _ := new(big.Int).SetString("10000000000000000000000000", 10) //nolint:gomnd - // we wait for the subscriber to be notified about the reorg - firstReorgedBlock := <-rd.subscriptions[testSubscriber].FirstReorgedBlock - require.Equal(t, reorgedBlocks[0].Number.Uint64(), firstReorgedBlock) + blockGasLimit := uint64(999999999999999999) //nolint:gomnd + client := simulated.NewBackend(map[common.Address]types.Account{ + auth.From: { + Balance: balance, + }, + }, simulated.WithBlockGasLimit(blockGasLimit)) + client.Commit() - // all blocks should be cleaned from the tracked blocks - // since subscriber had 5 blocks, 3 were finalized, and 2 were reorged but also finalized - subscriberBlocks := rd.trackedBlocks[testSubscriber] - require.Len(t, subscriberBlocks, 0) - }) + return client } -func TestReorgDetector_AddBlockToTrack(t *testing.T) { - t.Parallel() +func Test_ReorgDetector(t *testing.T) { + const subID = "test" ctx := context.Background() - t.Run("no subscription", func(t *testing.T) { - t.Parallel() - - client := NewEthClientMock(t) - db := newTestDB(t) - - client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( - &types.Header{Number: big.NewInt(10)}, nil, - ) - - rd, err := newReorgDetector(ctx, client, db) - require.NoError(t, err) - - err = rd.AddBlockToTrack(ctx, testSubscriber, 1, common.HexToHash("0x123")) - require.ErrorIs(t, err, ErrNotSubscribed) - }) - - t.Run("no unfinalised blocks - block not finalised", func(t *testing.T) { - t.Parallel() - - client := NewEthClientMock(t) - db := newTestDB(t) - - client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( - &types.Header{Number: big.NewInt(10)}, nil, - ).Once() - - rd, err := newReorgDetector(ctx, client, db) - require.NoError(t, err) - - _, err = rd.Subscribe(testSubscriber) - require.NoError(t, err) - - err = rd.AddBlockToTrack(ctx, testSubscriber, 11, common.HexToHash("0x123")) // block not finalized - require.NoError(t, err) - - subBlocks := rd.trackedBlocks[testSubscriber] - require.Len(t, subBlocks, 1) - require.Equal(t, subBlocks[11].Hash, common.HexToHash("0x123")) - }) - - t.Run("have unfinalised blocks - block not finalized", func(t *testing.T) { - t.Parallel() - - client := NewEthClientMock(t) - db := newTestDB(t) - - unfinalisedBlocks := createTestBlocks(t, 11, 5) - insertTestData(t, ctx, db, unfinalisedBlocks, unfinalisedBlocksID) - - for _, block := range unfinalisedBlocks { - client.On("HeaderByNumber", ctx, block.Number).Return( - block, nil, - ) - } - - client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( - &types.Header{Number: big.NewInt(10)}, nil, - ).Once() - - rd, err := newReorgDetector(ctx, client, db) - require.NoError(t, err) - - _, err = rd.Subscribe(testSubscriber) - require.NoError(t, err) - - err = rd.AddBlockToTrack(ctx, testSubscriber, 11, unfinalisedBlocks[0].Hash()) // block not finalized - require.NoError(t, err) - - subBlocks := rd.trackedBlocks[testSubscriber] - require.Len(t, subBlocks, 1) - require.Equal(t, subBlocks[11].Hash, unfinalisedBlocks[0].Hash()) - }) -} - -func TestReorgDetector_removeFinalisedBlocks(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - client := NewEthClientMock(t) - db := newTestDB(t) - - unfinalisedBlocks := createTestBlocks(t, 1, 10) - insertTestData(t, ctx, db, unfinalisedBlocks, unfinalisedBlocksID) - insertTestData(t, ctx, db, unfinalisedBlocks, testSubscriber) - - // call for removeFinalisedBlocks - client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( - &types.Header{Number: big.NewInt(5)}, nil, - ) - - rd := &ReorgDetector{ - ethClient: client, - db: db, - trackedBlocks: make(map[string]blockMap), - waitPeriodBlockRemover: 100 * time.Millisecond, - waitPeriodBlockAdder: 100 * time.Millisecond, - subscriptions: map[string]*Subscription{ - testSubscriber: { - FirstReorgedBlock: make(chan uint64), - ReorgProcessed: make(chan bool), - }, - unfinalisedBlocksID: { - FirstReorgedBlock: make(chan uint64), - ReorgProcessed: make(chan bool), - }, - }, - } - - trackedBlocks, err := rd.getTrackedBlocks(ctx) + // Simulated L1 + privateKeyL1, err := crypto.GenerateKey() + require.NoError(t, err) + authL1, err := bind.NewKeyedTransactorWithChainID(privateKeyL1, big.NewInt(1337)) + require.NoError(t, err) + clientL1 := newSimulatedL1(t, authL1) require.NoError(t, err) - require.Len(t, trackedBlocks, 2) - - rd.trackedBlocks = trackedBlocks - - // make sure we have all blocks in the tracked blocks before removing finalized blocks - require.Len(t, rd.trackedBlocks[unfinalisedBlocksID], len(unfinalisedBlocks)) - require.Len(t, rd.trackedBlocks[testSubscriber], len(unfinalisedBlocks)) - - // remove finalized blocks - go rd.removeFinalisedBlocks(ctx) - time.Sleep(3 * time.Second) // wait for the go routine to remove the finalized blocks - cancel() + // Create test DB dir + testDir := t.TempDir() - // make sure all blocks are removed from the tracked blocks - rd.trackedBlocksLock.RLock() - defer rd.trackedBlocksLock.RUnlock() + reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)}) + require.NoError(t, err) - require.Len(t, rd.trackedBlocks[unfinalisedBlocksID], 5) - require.Len(t, rd.trackedBlocks[testSubscriber], 5) -} + err = reorgDetector.Start(ctx) + require.NoError(t, err) -func createTestBlocks(t *testing.T, startBlock uint64, count uint64) []*types.Header { - t.Helper() + reorgSub, err := reorgDetector.Subscribe(subID) + require.NoError(t, err) - blocks := make([]*types.Header, 0, count) - for i := startBlock; i < startBlock+count; i++ { - blocks = append(blocks, &types.Header{Number: big.NewInt(int64(i))}) + remainingHeader, err := clientL1.Client().HeaderByHash(ctx, clientL1.Commit()) // Block 2 + require.NoError(t, err) + err = reorgDetector.AddBlockToTrack(ctx, subID, remainingHeader.Number.Uint64(), remainingHeader.Hash()) // Adding block 2 + require.NoError(t, err) + reorgHeader, err := clientL1.Client().HeaderByHash(ctx, clientL1.Commit()) // Block 3 + require.NoError(t, err) + firstHeaderAfterReorg, err := clientL1.Client().HeaderByHash(ctx, clientL1.Commit()) // Block 4 + require.NoError(t, err) + err = reorgDetector.AddBlockToTrack(ctx, subID, firstHeaderAfterReorg.Number.Uint64(), firstHeaderAfterReorg.Hash()) // Adding block 4 + require.NoError(t, err) + header, err := clientL1.Client().HeaderByHash(ctx, clientL1.Commit()) // Block 5 + require.NoError(t, err) + err = reorgDetector.AddBlockToTrack(ctx, subID, header.Number.Uint64(), header.Hash()) // Adding block 5 + require.NoError(t, err) + err = clientL1.Fork(reorgHeader.Hash()) // Reorg on block 3 + require.NoError(t, err) + clientL1.Commit() // Next block 4 after reorg on block 3 + clientL1.Commit() // Block 5 + clientL1.Commit() // Block 6 + + // Expect reorg on added blocks 4 -> all further blocks should be removed + select { + case firstReorgedBlock := <-reorgSub.ReorgedBlock: + reorgSub.ReorgProcessed <- true + require.Equal(t, firstHeaderAfterReorg.Number.Uint64(), firstReorgedBlock) + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for reorg") } - return blocks -} + // just wait a little for completion + time.Sleep(time.Second / 5) -func insertTestData(t *testing.T, ctx context.Context, db kv.RwDB, blocks []*types.Header, id string) { - t.Helper() - - // Insert some test data - err := db.Update(ctx, func(tx kv.RwTx) error { - - blockMap := newBlockMap() - for _, b := range blocks { - blockMap[b.Number.Uint64()] = block{b.Number.Uint64(), b.Hash()} - } - - raw, err := json.Marshal(blockMap.getSorted()) - if err != nil { - return err - } - - return tx.Put(subscriberBlocks, []byte(id), raw) - }) - - require.NoError(t, err) + reorgDetector.trackedBlocksLock.Lock() + headersList, ok := reorgDetector.trackedBlocks[subID] + reorgDetector.trackedBlocksLock.Unlock() + require.True(t, ok) + require.Equal(t, 1, headersList.len()) // Only block 2 left + require.Equal(t, remainingHeader.Hash(), headersList.get(2).Hash) } -*/ diff --git a/reorgdetector/types.go b/reorgdetector/types.go new file mode 100644 index 00000000..bee3eb44 --- /dev/null +++ b/reorgdetector/types.go @@ -0,0 +1,112 @@ +package reorgdetector + +import ( + "sort" + "sync" + + "github.com/ethereum/go-ethereum/common" +) + +type header struct { + Num uint64 + Hash common.Hash +} + +// newHeader returns a new instance of header +func newHeader(num uint64, hash common.Hash) header { + return header{ + Num: num, + Hash: hash, + } +} + +type headersList struct { + sync.RWMutex + headers map[uint64]header +} + +// newHeadersList returns a new instance of headersList +func newHeadersList(headers ...header) *headersList { + headersMap := make(map[uint64]header, len(headers)) + + for _, b := range headers { + headersMap[b.Num] = b + } + + return &headersList{ + headers: headersMap, + } +} + +// len returns the number of headers in the headers list +func (hl *headersList) len() int { + hl.RLock() + ln := len(hl.headers) + hl.RUnlock() + return ln +} + +// isEmpty returns true if the headers list is empty +func (hl *headersList) isEmpty() bool { + return hl.len() == 0 +} + +// add adds a header to the headers list +func (hl *headersList) add(h header) { + hl.Lock() + hl.headers[h.Num] = h + hl.Unlock() +} + +// copy returns a copy of the headers list +func (hl *headersList) copy() *headersList { + hl.RLock() + defer hl.RUnlock() + + headersMap := make(map[uint64]header, len(hl.headers)) + for k, v := range hl.headers { + headersMap[k] = v + } + + return &headersList{ + headers: headersMap, + } +} + +// get returns a header by block number +func (hl *headersList) get(num uint64) *header { + hl.RLock() + defer hl.RUnlock() + + if b, ok := hl.headers[num]; ok { + return &b + } + + return nil +} + +// getSorted returns headers in sorted order +func (hl *headersList) getSorted() []header { + sortedBlocks := make([]header, 0, len(hl.headers)) + + hl.RLock() + for _, b := range hl.headers { + sortedBlocks = append(sortedBlocks, b) + } + hl.RUnlock() + + sort.Slice(sortedBlocks, func(i, j int) bool { + return sortedBlocks[i].Num < sortedBlocks[j].Num + }) + + return sortedBlocks +} + +// removeRange removes headers from "from" to "to" +func (hl *headersList) removeRange(from, to uint64) { + hl.Lock() + for i := from; i <= to; i++ { + delete(hl.headers, i) + } + hl.Unlock() +} diff --git a/reorgdetector/types_test.go b/reorgdetector/types_test.go new file mode 100644 index 00000000..9e20e363 --- /dev/null +++ b/reorgdetector/types_test.go @@ -0,0 +1,104 @@ +package reorgdetector + +import ( + "reflect" + "testing" + + "github.com/ethereum/go-ethereum/common" +) + +func TestBlockMap(t *testing.T) { + t.Parallel() + + // Create a new block map + bm := newHeadersList( + header{Num: 1, Hash: common.HexToHash("0x123")}, + header{Num: 2, Hash: common.HexToHash("0x456")}, + header{Num: 3, Hash: common.HexToHash("0x789")}, + ) + + t.Run("len", func(t *testing.T) { + t.Parallel() + + actualLen := bm.len() + expectedLen := 3 + if !reflect.DeepEqual(expectedLen, actualLen) { + t.Errorf("len() returned incorrect result, expected: %v, got: %v", expectedLen, actualLen) + } + }) + + t.Run("isEmpty", func(t *testing.T) { + t.Parallel() + + if bm.isEmpty() { + t.Error("isEmpty() returned incorrect result, expected: false, got: true") + } + }) + + t.Run("add", func(t *testing.T) { + t.Parallel() + + bm := bm.copy() + tba := header{Num: 4, Hash: common.HexToHash("0xabc")} + bm.add(tba) + if !reflect.DeepEqual(tba, bm.headers[4]) { + t.Errorf("add() returned incorrect result, expected: %v, got: %v", tba, bm.headers[4]) + } + }) + + t.Run("copy", func(t *testing.T) { + t.Parallel() + + copiedBm := bm.copy() + if !reflect.DeepEqual(bm, copiedBm) { + t.Errorf("add() returned incorrect result, expected: %v, got: %v", bm, copiedBm) + } + }) + + t.Run("get", func(t *testing.T) { + t.Parallel() + + if !reflect.DeepEqual(*bm.get(3), bm.headers[3]) { + t.Errorf("get() returned incorrect result, expected: %v, got: %v", bm.get(3), bm.headers[3]) + } + }) + + t.Run("getSorted", func(t *testing.T) { + t.Parallel() + + sortedBlocks := bm.getSorted() + expectedSortedBlocks := []header{ + {Num: 1, Hash: common.HexToHash("0x123")}, + {Num: 2, Hash: common.HexToHash("0x456")}, + {Num: 3, Hash: common.HexToHash("0x789")}, + } + if !reflect.DeepEqual(sortedBlocks, expectedSortedBlocks) { + t.Errorf("getSorted() returned incorrect result, expected: %v, got: %v", expectedSortedBlocks, sortedBlocks) + } + }) + + t.Run("removeRange", func(t *testing.T) { + t.Parallel() + + bm := newHeadersList( + header{Num: 1, Hash: common.HexToHash("0x123")}, + header{Num: 2, Hash: common.HexToHash("0x456")}, + header{Num: 3, Hash: common.HexToHash("0x789")}, + header{Num: 4, Hash: common.HexToHash("0xabc")}, + header{Num: 5, Hash: common.HexToHash("0xdef")}, + ) + + bm.removeRange(3, 5) + + expectedBlocks := []header{ + {Num: 1, Hash: common.HexToHash("0x123")}, + {Num: 2, Hash: common.HexToHash("0x456")}, + } + + sortedBlocks := bm.getSorted() + + if !reflect.DeepEqual(sortedBlocks, expectedBlocks) { + t.Errorf("removeRange() failed, expected: %v, got: %v", expectedBlocks, sortedBlocks) + } + }) +} diff --git a/sync/evmdriver.go b/sync/evmdriver.go index 7f782939..2edd2e15 100644 --- a/sync/evmdriver.go +++ b/sync/evmdriver.go @@ -93,7 +93,7 @@ reset: case b := <-downloadCh: d.log.Debug("handleNewBlock") d.handleNewBlock(ctx, b) - case firstReorgedBlock := <-d.reorgSub.FirstReorgedBlock: + case firstReorgedBlock := <-d.reorgSub.ReorgedBlock: d.log.Debug("handleReorg") d.handleReorg(ctx, cancel, downloadCh, firstReorgedBlock) goto reset diff --git a/sync/evmdriver_test.go b/sync/evmdriver_test.go index 5b1abbfe..74692321 100644 --- a/sync/evmdriver_test.go +++ b/sync/evmdriver_test.go @@ -29,8 +29,8 @@ func TestSync(t *testing.T) { firstReorgedBlock := make(chan uint64) reorgProcessed := make(chan bool) rdm.On("Subscribe", reorgDetectorID).Return(&reorgdetector.Subscription{ - FirstReorgedBlock: firstReorgedBlock, - ReorgProcessed: reorgProcessed, + ReorgedBlock: firstReorgedBlock, + ReorgProcessed: reorgProcessed, }, nil) driver, err := NewEVMDriver(rdm, pm, dm, reorgDetectorID, 10, rh) require.NoError(t, err) diff --git a/sync/mock_downloader_test.go b/sync/mock_downloader_test.go index 1cd476ad..c965efb6 100644 --- a/sync/mock_downloader_test.go +++ b/sync/mock_downloader_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.22.1. DO NOT EDIT. +// Code generated by mockery v2.40.1. DO NOT EDIT. package sync @@ -23,6 +23,10 @@ func (_m *EVMDownloaderMock) Download(ctx context.Context, fromBlock uint64, dow func (_m *EVMDownloaderMock) GetBlockHeader(ctx context.Context, blockNum uint64) EVMBlockHeader { ret := _m.Called(ctx, blockNum) + if len(ret) == 0 { + panic("no return value specified for GetBlockHeader") + } + var r0 EVMBlockHeader if rf, ok := ret.Get(0).(func(context.Context, uint64) EVMBlockHeader); ok { r0 = rf(ctx, blockNum) @@ -37,6 +41,10 @@ func (_m *EVMDownloaderMock) GetBlockHeader(ctx context.Context, blockNum uint64 func (_m *EVMDownloaderMock) GetEventsByBlockRange(ctx context.Context, fromBlock uint64, toBlock uint64) []EVMBlock { ret := _m.Called(ctx, fromBlock, toBlock) + if len(ret) == 0 { + panic("no return value specified for GetEventsByBlockRange") + } + var r0 []EVMBlock if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64) []EVMBlock); ok { r0 = rf(ctx, fromBlock, toBlock) @@ -53,6 +61,10 @@ func (_m *EVMDownloaderMock) GetEventsByBlockRange(ctx context.Context, fromBloc func (_m *EVMDownloaderMock) GetLogs(ctx context.Context, fromBlock uint64, toBlock uint64) []types.Log { ret := _m.Called(ctx, fromBlock, toBlock) + if len(ret) == 0 { + panic("no return value specified for GetLogs") + } + var r0 []types.Log if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64) []types.Log); ok { r0 = rf(ctx, fromBlock, toBlock) @@ -69,6 +81,10 @@ func (_m *EVMDownloaderMock) GetLogs(ctx context.Context, fromBlock uint64, toBl func (_m *EVMDownloaderMock) WaitForNewBlocks(ctx context.Context, lastBlockSeen uint64) uint64 { ret := _m.Called(ctx, lastBlockSeen) + if len(ret) == 0 { + panic("no return value specified for WaitForNewBlocks") + } + var r0 uint64 if rf, ok := ret.Get(0).(func(context.Context, uint64) uint64); ok { r0 = rf(ctx, lastBlockSeen) @@ -79,13 +95,12 @@ func (_m *EVMDownloaderMock) WaitForNewBlocks(ctx context.Context, lastBlockSeen return r0 } -type mockConstructorTestingTNewEVMDownloaderMock interface { +// NewEVMDownloaderMock creates a new instance of EVMDownloaderMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewEVMDownloaderMock(t interface { mock.TestingT Cleanup(func()) -} - -// NewEVMDownloaderMock creates a new instance of EVMDownloaderMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewEVMDownloaderMock(t mockConstructorTestingTNewEVMDownloaderMock) *EVMDownloaderMock { +}) *EVMDownloaderMock { mock := &EVMDownloaderMock{} mock.Mock.Test(t) diff --git a/sync/mock_l2_test.go b/sync/mock_l2_test.go index 0d1e03da..78d75191 100644 --- a/sync/mock_l2_test.go +++ b/sync/mock_l2_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.22.1. DO NOT EDIT. +// Code generated by mockery v2.40.1. DO NOT EDIT. package sync @@ -24,6 +24,10 @@ type L2Mock struct { func (_m *L2Mock) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { ret := _m.Called(ctx, hash) + if len(ret) == 0 { + panic("no return value specified for BlockByHash") + } + var r0 *types.Block var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.Block, error)); ok { @@ -50,6 +54,10 @@ func (_m *L2Mock) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo func (_m *L2Mock) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { ret := _m.Called(ctx, number) + if len(ret) == 0 { + panic("no return value specified for BlockByNumber") + } + var r0 *types.Block var r1 error if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Block, error)); ok { @@ -76,6 +84,10 @@ func (_m *L2Mock) BlockByNumber(ctx context.Context, number *big.Int) (*types.Bl func (_m *L2Mock) BlockNumber(ctx context.Context) (uint64, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for BlockNumber") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { @@ -100,6 +112,10 @@ func (_m *L2Mock) BlockNumber(ctx context.Context) (uint64, error) { func (_m *L2Mock) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { ret := _m.Called(ctx, call, blockNumber) + if len(ret) == 0 { + panic("no return value specified for CallContract") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, ethereum.CallMsg, *big.Int) ([]byte, error)); ok { @@ -126,6 +142,10 @@ func (_m *L2Mock) CallContract(ctx context.Context, call ethereum.CallMsg, block func (_m *L2Mock) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) { ret := _m.Called(ctx, contract, blockNumber) + if len(ret) == 0 { + panic("no return value specified for CodeAt") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) ([]byte, error)); ok { @@ -152,6 +172,10 @@ func (_m *L2Mock) CodeAt(ctx context.Context, contract common.Address, blockNumb func (_m *L2Mock) EstimateGas(ctx context.Context, call ethereum.CallMsg) (uint64, error) { ret := _m.Called(ctx, call) + if len(ret) == 0 { + panic("no return value specified for EstimateGas") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(context.Context, ethereum.CallMsg) (uint64, error)); ok { @@ -176,6 +200,10 @@ func (_m *L2Mock) EstimateGas(ctx context.Context, call ethereum.CallMsg) (uint6 func (_m *L2Mock) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { ret := _m.Called(ctx, q) + if len(ret) == 0 { + panic("no return value specified for FilterLogs") + } + var r0 []types.Log var r1 error if rf, ok := ret.Get(0).(func(context.Context, ethereum.FilterQuery) ([]types.Log, error)); ok { @@ -202,6 +230,10 @@ func (_m *L2Mock) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]typ func (_m *L2Mock) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { ret := _m.Called(ctx, hash) + if len(ret) == 0 { + panic("no return value specified for HeaderByHash") + } + var r0 *types.Header var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.Header, error)); ok { @@ -228,6 +260,10 @@ func (_m *L2Mock) HeaderByHash(ctx context.Context, hash common.Hash) (*types.He func (_m *L2Mock) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { ret := _m.Called(ctx, number) + if len(ret) == 0 { + panic("no return value specified for HeaderByNumber") + } + var r0 *types.Header var r1 error if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Header, error)); ok { @@ -254,6 +290,10 @@ func (_m *L2Mock) HeaderByNumber(ctx context.Context, number *big.Int) (*types.H func (_m *L2Mock) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) { ret := _m.Called(ctx, account) + if len(ret) == 0 { + panic("no return value specified for PendingCodeAt") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Address) ([]byte, error)); ok { @@ -280,6 +320,10 @@ func (_m *L2Mock) PendingCodeAt(ctx context.Context, account common.Address) ([] func (_m *L2Mock) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { ret := _m.Called(ctx, account) + if len(ret) == 0 { + panic("no return value specified for PendingNonceAt") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Address) (uint64, error)); ok { @@ -304,6 +348,10 @@ func (_m *L2Mock) PendingNonceAt(ctx context.Context, account common.Address) (u func (_m *L2Mock) SendTransaction(ctx context.Context, tx *types.Transaction) error { ret := _m.Called(ctx, tx) + if len(ret) == 0 { + panic("no return value specified for SendTransaction") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, *types.Transaction) error); ok { r0 = rf(ctx, tx) @@ -318,6 +366,10 @@ func (_m *L2Mock) SendTransaction(ctx context.Context, tx *types.Transaction) er func (_m *L2Mock) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { ret := _m.Called(ctx, q, ch) + if len(ret) == 0 { + panic("no return value specified for SubscribeFilterLogs") + } + var r0 ethereum.Subscription var r1 error if rf, ok := ret.Get(0).(func(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error)); ok { @@ -344,6 +396,10 @@ func (_m *L2Mock) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuer func (_m *L2Mock) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { ret := _m.Called(ctx, ch) + if len(ret) == 0 { + panic("no return value specified for SubscribeNewHead") + } + var r0 ethereum.Subscription var r1 error if rf, ok := ret.Get(0).(func(context.Context, chan<- *types.Header) (ethereum.Subscription, error)); ok { @@ -370,6 +426,10 @@ func (_m *L2Mock) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) func (_m *L2Mock) SuggestGasPrice(ctx context.Context) (*big.Int, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for SuggestGasPrice") + } + var r0 *big.Int var r1 error if rf, ok := ret.Get(0).(func(context.Context) (*big.Int, error)); ok { @@ -396,6 +456,10 @@ func (_m *L2Mock) SuggestGasPrice(ctx context.Context) (*big.Int, error) { func (_m *L2Mock) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for SuggestGasTipCap") + } + var r0 *big.Int var r1 error if rf, ok := ret.Get(0).(func(context.Context) (*big.Int, error)); ok { @@ -422,6 +486,10 @@ func (_m *L2Mock) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { func (_m *L2Mock) TransactionCount(ctx context.Context, blockHash common.Hash) (uint, error) { ret := _m.Called(ctx, blockHash) + if len(ret) == 0 { + panic("no return value specified for TransactionCount") + } + var r0 uint var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (uint, error)); ok { @@ -446,6 +514,10 @@ func (_m *L2Mock) TransactionCount(ctx context.Context, blockHash common.Hash) ( func (_m *L2Mock) TransactionInBlock(ctx context.Context, blockHash common.Hash, index uint) (*types.Transaction, error) { ret := _m.Called(ctx, blockHash, index) + if len(ret) == 0 { + panic("no return value specified for TransactionInBlock") + } + var r0 *types.Transaction var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Hash, uint) (*types.Transaction, error)); ok { @@ -468,13 +540,12 @@ func (_m *L2Mock) TransactionInBlock(ctx context.Context, blockHash common.Hash, return r0, r1 } -type mockConstructorTestingTNewL2Mock interface { +// NewL2Mock creates a new instance of L2Mock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewL2Mock(t interface { mock.TestingT Cleanup(func()) -} - -// NewL2Mock creates a new instance of L2Mock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewL2Mock(t mockConstructorTestingTNewL2Mock) *L2Mock { +}) *L2Mock { mock := &L2Mock{} mock.Mock.Test(t) diff --git a/sync/mock_processor_test.go b/sync/mock_processor_test.go index 19738ef5..8e562e9b 100644 --- a/sync/mock_processor_test.go +++ b/sync/mock_processor_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.22.1. DO NOT EDIT. +// Code generated by mockery v2.40.1. DO NOT EDIT. package sync @@ -17,6 +17,10 @@ type ProcessorMock struct { func (_m *ProcessorMock) GetLastProcessedBlock(ctx context.Context) (uint64, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for GetLastProcessedBlock") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { @@ -41,6 +45,10 @@ func (_m *ProcessorMock) GetLastProcessedBlock(ctx context.Context) (uint64, err func (_m *ProcessorMock) ProcessBlock(ctx context.Context, block Block) error { ret := _m.Called(ctx, block) + if len(ret) == 0 { + panic("no return value specified for ProcessBlock") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, Block) error); ok { r0 = rf(ctx, block) @@ -55,6 +63,10 @@ func (_m *ProcessorMock) ProcessBlock(ctx context.Context, block Block) error { func (_m *ProcessorMock) Reorg(ctx context.Context, firstReorgedBlock uint64) error { ret := _m.Called(ctx, firstReorgedBlock) + if len(ret) == 0 { + panic("no return value specified for Reorg") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, uint64) error); ok { r0 = rf(ctx, firstReorgedBlock) @@ -65,13 +77,12 @@ func (_m *ProcessorMock) Reorg(ctx context.Context, firstReorgedBlock uint64) er return r0 } -type mockConstructorTestingTNewProcessorMock interface { +// NewProcessorMock creates a new instance of ProcessorMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewProcessorMock(t interface { mock.TestingT Cleanup(func()) -} - -// NewProcessorMock creates a new instance of ProcessorMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewProcessorMock(t mockConstructorTestingTNewProcessorMock) *ProcessorMock { +}) *ProcessorMock { mock := &ProcessorMock{} mock.Mock.Test(t) diff --git a/sync/mock_reorgdetector_test.go b/sync/mock_reorgdetector_test.go index 056da2a1..52cd0cd0 100644 --- a/sync/mock_reorgdetector_test.go +++ b/sync/mock_reorgdetector_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.22.1. DO NOT EDIT. +// Code generated by mockery v2.40.1. DO NOT EDIT. package sync @@ -21,6 +21,10 @@ type ReorgDetectorMock struct { func (_m *ReorgDetectorMock) AddBlockToTrack(ctx context.Context, id string, blockNum uint64, blockHash common.Hash) error { ret := _m.Called(ctx, id, blockNum, blockHash) + if len(ret) == 0 { + panic("no return value specified for AddBlockToTrack") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, uint64, common.Hash) error); ok { r0 = rf(ctx, id, blockNum, blockHash) @@ -35,6 +39,10 @@ func (_m *ReorgDetectorMock) AddBlockToTrack(ctx context.Context, id string, blo func (_m *ReorgDetectorMock) Subscribe(id string) (*reorgdetector.Subscription, error) { ret := _m.Called(id) + if len(ret) == 0 { + panic("no return value specified for Subscribe") + } + var r0 *reorgdetector.Subscription var r1 error if rf, ok := ret.Get(0).(func(string) (*reorgdetector.Subscription, error)); ok { @@ -57,13 +65,12 @@ func (_m *ReorgDetectorMock) Subscribe(id string) (*reorgdetector.Subscription, return r0, r1 } -type mockConstructorTestingTNewReorgDetectorMock interface { +// NewReorgDetectorMock creates a new instance of ReorgDetectorMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReorgDetectorMock(t interface { mock.TestingT Cleanup(func()) -} - -// NewReorgDetectorMock creates a new instance of ReorgDetectorMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewReorgDetectorMock(t mockConstructorTestingTNewReorgDetectorMock) *ReorgDetectorMock { +}) *ReorgDetectorMock { mock := &ReorgDetectorMock{} mock.Mock.Test(t) diff --git a/test/helpers/aggoracle_e2e.go b/test/helpers/aggoracle_e2e.go index c2908b8d..fdde39dd 100644 --- a/test/helpers/aggoracle_e2e.go +++ b/test/helpers/aggoracle_e2e.go @@ -101,7 +101,7 @@ func CommonSetup(t *testing.T) ( require.NoError(t, err) // Reorg detector dbPathReorgDetector := t.TempDir() - reorg, err := reorgdetector.New(ctx, l1Client.Client(), dbPathReorgDetector) + reorg, err := reorgdetector.New(l1Client.Client(), reorgdetector.Config{DBPath: dbPathReorgDetector}) require.NoError(t, err) // Syncer dbPathSyncer := t.TempDir() diff --git a/test/helpers/mock_ethtxmanager.go b/test/helpers/mock_ethtxmanager.go index 995084a2..848992f4 100644 --- a/test/helpers/mock_ethtxmanager.go +++ b/test/helpers/mock_ethtxmanager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.22.1. DO NOT EDIT. +// Code generated by mockery v2.40.1. DO NOT EDIT. package helpers @@ -25,6 +25,10 @@ type EthTxManagerMock struct { func (_m *EthTxManagerMock) Add(ctx context.Context, to *common.Address, forcedNonce *uint64, value *big.Int, data []byte, gasOffset uint64, sidecar *types.BlobTxSidecar) (common.Hash, error) { ret := _m.Called(ctx, to, forcedNonce, value, data, gasOffset, sidecar) + if len(ret) == 0 { + panic("no return value specified for Add") + } + var r0 common.Hash var r1 error if rf, ok := ret.Get(0).(func(context.Context, *common.Address, *uint64, *big.Int, []byte, uint64, *types.BlobTxSidecar) (common.Hash, error)); ok { @@ -51,6 +55,10 @@ func (_m *EthTxManagerMock) Add(ctx context.Context, to *common.Address, forcedN func (_m *EthTxManagerMock) Remove(ctx context.Context, id common.Hash) error { ret := _m.Called(ctx, id) + if len(ret) == 0 { + panic("no return value specified for Remove") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, common.Hash) error); ok { r0 = rf(ctx, id) @@ -65,6 +73,10 @@ func (_m *EthTxManagerMock) Remove(ctx context.Context, id common.Hash) error { func (_m *EthTxManagerMock) Result(ctx context.Context, id common.Hash) (ethtxmanager.MonitoredTxResult, error) { ret := _m.Called(ctx, id) + if len(ret) == 0 { + panic("no return value specified for Result") + } + var r0 ethtxmanager.MonitoredTxResult var r1 error if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (ethtxmanager.MonitoredTxResult, error)); ok { @@ -89,6 +101,10 @@ func (_m *EthTxManagerMock) Result(ctx context.Context, id common.Hash) (ethtxma func (_m *EthTxManagerMock) ResultsByStatus(ctx context.Context, statuses []ethtxmanager.MonitoredTxStatus) ([]ethtxmanager.MonitoredTxResult, error) { ret := _m.Called(ctx, statuses) + if len(ret) == 0 { + panic("no return value specified for ResultsByStatus") + } + var r0 []ethtxmanager.MonitoredTxResult var r1 error if rf, ok := ret.Get(0).(func(context.Context, []ethtxmanager.MonitoredTxStatus) ([]ethtxmanager.MonitoredTxResult, error)); ok { @@ -111,13 +127,12 @@ func (_m *EthTxManagerMock) ResultsByStatus(ctx context.Context, statuses []etht return r0, r1 } -type mockConstructorTestingTNewEthTxManagerMock interface { +// NewEthTxManagerMock creates a new instance of EthTxManagerMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewEthTxManagerMock(t interface { mock.TestingT Cleanup(func()) -} - -// NewEthTxManagerMock creates a new instance of EthTxManagerMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewEthTxManagerMock(t mockConstructorTestingTNewEthTxManagerMock) *EthTxManagerMock { +}) *EthTxManagerMock { mock := &EthTxManagerMock{} mock.Mock.Test(t)