From acb1a3e0756a1997b683e6f5252676dc99a231b0 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Mon, 22 Jul 2024 12:39:27 +0200 Subject: [PATCH] feat: removeTrackedBlocks UT --- reorgdetector/reorgdetector.go | 27 +++++++++----- reorgdetector/reorgdetector_test.go | 55 +++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 8 deletions(-) diff --git a/reorgdetector/reorgdetector.go b/reorgdetector/reorgdetector.go index 8f35e82e..59cd7673 100644 --- a/reorgdetector/reorgdetector.go +++ b/reorgdetector/reorgdetector.go @@ -21,8 +21,8 @@ import ( // the client will have at least as many blocks as it had before the reorg, however this may not be the case for L2 const ( - waitPeriodBlockRemover = time.Second * 20 - waitPeriodBlockAdder = time.Second * 2 // should be smaller than block time of the tracked chain + defaultWaitPeriodBlockRemover = time.Second * 20 + defaultWaitPeriodBlockAdder = time.Second * 2 // should be smaller than block time of the tracked chain subscriberBlocks = "reorgdetector-subscriberBlocks" @@ -94,7 +94,7 @@ func (bm blockMap) getFromBlockSorted(blockNum uint64) []block { newBlocks := make([]block, 0, numOfBlocksToLeave) for i := numOfBlocks - numOfBlocksToLeave; i < numOfBlocks; i++ { - if sortedBlocks[i].Num < lastBlock { + if sortedBlocks[i].Num < blockNum { // skip blocks that are finalised continue } @@ -145,6 +145,9 @@ type ReorgDetector struct { trackedBlocks map[string]blockMap db kv.RwDB + + waitPeriodBlockRemover time.Duration + waitPeriodBlockAdder time.Duration } // New creates a new instance of ReorgDetector @@ -162,10 +165,18 @@ func New(ctx context.Context, client EthClient, dbPath string) (*ReorgDetector, // newReorgDetector creates a new instance of ReorgDetector func newReorgDetector(ctx context.Context, client EthClient, db kv.RwDB) (*ReorgDetector, error) { + return newReorgDetectorWithPeriods(ctx, client, db, defaultWaitPeriodBlockRemover, defaultWaitPeriodBlockAdder) +} + +// newReorgDetectorWithPeriods creates a new instance of ReorgDetector with custom wait periods +func newReorgDetectorWithPeriods(ctx context.Context, client EthClient, db kv.RwDB, + waitPeriodBlockRemover, waitPeriodBlockAdder time.Duration) (*ReorgDetector, error) { r := &ReorgDetector{ - ethClient: client, - db: db, - subscriptions: make(map[string]*Subscription, 0), + ethClient: client, + db: db, + subscriptions: make(map[string]*Subscription, 0), + waitPeriodBlockRemover: waitPeriodBlockRemover, + waitPeriodBlockAdder: waitPeriodBlockAdder, } trackedBlocks, err := r.getTrackedBlocks(ctx) @@ -299,7 +310,7 @@ func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFi } func (r *ReorgDetector) removeFinalisedBlocks(ctx context.Context) { - ticker := time.NewTicker(waitPeriodBlockRemover) + ticker := time.NewTicker(r.waitPeriodBlockRemover) for { select { @@ -325,7 +336,7 @@ func (r *ReorgDetector) removeFinalisedBlocks(ctx context.Context) { func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) { var ( lastUnfinalisedBlock uint64 - ticker = time.NewTicker(waitPeriodBlockAdder) + ticker = time.NewTicker(r.waitPeriodBlockAdder) unfinalisedBlocksMap = r.getUnfinalisedBlocksMap() ) diff --git a/reorgdetector/reorgdetector_test.go b/reorgdetector/reorgdetector_test.go index 62b6e56b..ae44b924 100644 --- a/reorgdetector/reorgdetector_test.go +++ b/reorgdetector/reorgdetector_test.go @@ -399,6 +399,61 @@ func TestReorgDetector_AddBlockToTrack(t *testing.T) { }) } +func TestReorgDetector_removeFinalisedBlocks(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + client := NewEthClientMock(t) + db := newTestDB(t) + + unfinalisedBlocks := createTestBlocks(t, 1, 10) + insertTestData(t, ctx, db, unfinalisedBlocks, unfalisedBlocksID) + insertTestData(t, ctx, db, unfinalisedBlocks, testSubscriber) + + // call for removeFinalisedBlocks + client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return( + &types.Header{Number: big.NewInt(5)}, nil, + ) + + rd := &ReorgDetector{ + ethClient: client, + db: db, + trackedBlocks: make(map[string]blockMap), + waitPeriodBlockRemover: 100 * time.Millisecond, + waitPeriodBlockAdder: 100 * time.Millisecond, + subscriptions: map[string]*Subscription{ + testSubscriber: { + FirstReorgedBlock: make(chan uint64), + ReorgProcessed: make(chan bool), + }, + unfalisedBlocksID: { + FirstReorgedBlock: make(chan uint64), + ReorgProcessed: make(chan bool), + }, + }, + } + + trackedBlocks, err := rd.getTrackedBlocks(ctx) + require.NoError(t, err) + require.Len(t, trackedBlocks, 2) + + rd.trackedBlocks = trackedBlocks + + // make sure we have all blocks in the tracked blocks before removing finalized blocks + require.Len(t, rd.trackedBlocks[unfalisedBlocksID], len(unfinalisedBlocks)) + require.Len(t, rd.trackedBlocks[testSubscriber], len(unfinalisedBlocks)) + + // remove finalized blocks + go rd.removeFinalisedBlocks(ctx) + + time.Sleep(3 * time.Second) // wait for the go routine to remove the finalized blocks + cancel() + + // make sure all blocks are removed from the tracked blocks + require.Len(t, rd.trackedBlocks[unfalisedBlocksID], 5) + require.Len(t, rd.trackedBlocks[testSubscriber], 5) +} + func createTestBlocks(t *testing.T, startBlock uint64, count uint64) []*types.Header { t.Helper()