Skip to content

Commit

Permalink
feat: removeTrackedBlocks UT
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Jul 22, 2024
1 parent c06b58a commit acb1a3e
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 8 deletions.
27 changes: 19 additions & 8 deletions reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
// the client will have at least as many blocks as it had before the reorg, however this may not be the case for L2

const (
waitPeriodBlockRemover = time.Second * 20
waitPeriodBlockAdder = time.Second * 2 // should be smaller than block time of the tracked chain
defaultWaitPeriodBlockRemover = time.Second * 20
defaultWaitPeriodBlockAdder = time.Second * 2 // should be smaller than block time of the tracked chain

subscriberBlocks = "reorgdetector-subscriberBlocks"

Expand Down Expand Up @@ -94,7 +94,7 @@ func (bm blockMap) getFromBlockSorted(blockNum uint64) []block {

newBlocks := make([]block, 0, numOfBlocksToLeave)
for i := numOfBlocks - numOfBlocksToLeave; i < numOfBlocks; i++ {
if sortedBlocks[i].Num < lastBlock {
if sortedBlocks[i].Num < blockNum {
// skip blocks that are finalised
continue
}
Expand Down Expand Up @@ -145,6 +145,9 @@ type ReorgDetector struct {
trackedBlocks map[string]blockMap

db kv.RwDB

waitPeriodBlockRemover time.Duration
waitPeriodBlockAdder time.Duration
}

// New creates a new instance of ReorgDetector
Expand All @@ -162,10 +165,18 @@ func New(ctx context.Context, client EthClient, dbPath string) (*ReorgDetector,

// newReorgDetector creates a new instance of ReorgDetector
func newReorgDetector(ctx context.Context, client EthClient, db kv.RwDB) (*ReorgDetector, error) {
return newReorgDetectorWithPeriods(ctx, client, db, defaultWaitPeriodBlockRemover, defaultWaitPeriodBlockAdder)
}

// newReorgDetectorWithPeriods creates a new instance of ReorgDetector with custom wait periods
func newReorgDetectorWithPeriods(ctx context.Context, client EthClient, db kv.RwDB,
waitPeriodBlockRemover, waitPeriodBlockAdder time.Duration) (*ReorgDetector, error) {
r := &ReorgDetector{
ethClient: client,
db: db,
subscriptions: make(map[string]*Subscription, 0),
ethClient: client,
db: db,
subscriptions: make(map[string]*Subscription, 0),
waitPeriodBlockRemover: waitPeriodBlockRemover,
waitPeriodBlockAdder: waitPeriodBlockAdder,
}

trackedBlocks, err := r.getTrackedBlocks(ctx)
Expand Down Expand Up @@ -299,7 +310,7 @@ func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFi
}

func (r *ReorgDetector) removeFinalisedBlocks(ctx context.Context) {
ticker := time.NewTicker(waitPeriodBlockRemover)
ticker := time.NewTicker(r.waitPeriodBlockRemover)

for {
select {
Expand All @@ -325,7 +336,7 @@ func (r *ReorgDetector) removeFinalisedBlocks(ctx context.Context) {
func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) {
var (
lastUnfinalisedBlock uint64
ticker = time.NewTicker(waitPeriodBlockAdder)
ticker = time.NewTicker(r.waitPeriodBlockAdder)
unfinalisedBlocksMap = r.getUnfinalisedBlocksMap()
)

Expand Down
55 changes: 55 additions & 0 deletions reorgdetector/reorgdetector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,61 @@ func TestReorgDetector_AddBlockToTrack(t *testing.T) {
})
}

func TestReorgDetector_removeFinalisedBlocks(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
client := NewEthClientMock(t)
db := newTestDB(t)

unfinalisedBlocks := createTestBlocks(t, 1, 10)
insertTestData(t, ctx, db, unfinalisedBlocks, unfalisedBlocksID)
insertTestData(t, ctx, db, unfinalisedBlocks, testSubscriber)

// call for removeFinalisedBlocks
client.On("HeaderByNumber", ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))).Return(
&types.Header{Number: big.NewInt(5)}, nil,
)

rd := &ReorgDetector{
ethClient: client,
db: db,
trackedBlocks: make(map[string]blockMap),
waitPeriodBlockRemover: 100 * time.Millisecond,
waitPeriodBlockAdder: 100 * time.Millisecond,
subscriptions: map[string]*Subscription{
testSubscriber: {
FirstReorgedBlock: make(chan uint64),
ReorgProcessed: make(chan bool),
},
unfalisedBlocksID: {
FirstReorgedBlock: make(chan uint64),
ReorgProcessed: make(chan bool),
},
},
}

trackedBlocks, err := rd.getTrackedBlocks(ctx)
require.NoError(t, err)
require.Len(t, trackedBlocks, 2)

rd.trackedBlocks = trackedBlocks

// make sure we have all blocks in the tracked blocks before removing finalized blocks
require.Len(t, rd.trackedBlocks[unfalisedBlocksID], len(unfinalisedBlocks))
require.Len(t, rd.trackedBlocks[testSubscriber], len(unfinalisedBlocks))

// remove finalized blocks
go rd.removeFinalisedBlocks(ctx)

time.Sleep(3 * time.Second) // wait for the go routine to remove the finalized blocks
cancel()

// make sure all blocks are removed from the tracked blocks
require.Len(t, rd.trackedBlocks[unfalisedBlocksID], 5)
require.Len(t, rd.trackedBlocks[testSubscriber], 5)
}

func createTestBlocks(t *testing.T, startBlock uint64, count uint64) []*types.Header {
t.Helper()

Expand Down

0 comments on commit acb1a3e

Please sign in to comment.