Skip to content

Commit

Permalink
fix: addUnfinilisedBlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Jul 19, 2024
1 parent 396258d commit 94a3f4c
Showing 1 changed file with 98 additions and 85 deletions.
183 changes: 98 additions & 85 deletions reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,27 @@ func (bm blockMap) getFromBlockSorted(blockNum uint64) []block {
return sortedBlocks
}

// getClosestHigherBlock returns the closest higher block to the given blockNum
func (bm blockMap) getClosestHigherBlock(blockNum uint64) block {
if block, ok := bm[blockNum]; ok {
return block
}

sorted := bm.getFromBlockSorted(blockNum)
if len(sorted) == 0 {
return block{}
}

return sorted[0]
}

// removeRange removes blocks from from to to
func (bm blockMap) removeRange(from, to uint64) {
for i := from; i <= to; i++ {
delete(bm, i)
}
}

type Subscription struct {
FirstReorgedBlock chan uint64
ReorgProcessed chan bool
Expand Down Expand Up @@ -167,18 +188,8 @@ func newReorgDetector(ctx context.Context, client EthClient, db kv.RwDB) (*Reorg
}

func (r *ReorgDetector) Start(ctx context.Context) {
unfinalisedBlocks, ok := r.trackedBlocks[unfalisedBlocksID]
if !ok {
unfinalisedBlocks = newBlockMap()
}

var lastUnfinalisedBlock uint64
if len(unfinalisedBlocks) > 0 {
lastUnfinalisedBlock = unfinalisedBlocks.getSorted()[len(unfinalisedBlocks)-1].Num
}

go r.removeFinalisedBlocks(ctx)
go r.addUnfinalisedBlocks(ctx, lastUnfinalisedBlock+1)
go r.addUnfinalisedBlocks(ctx)
}

func (r *ReorgDetector) Subscribe(id string) (*Subscription, error) {
Expand Down Expand Up @@ -267,17 +278,18 @@ func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFi

if actualBlockHash != block.Hash {
// reorg detected, notify subscriber
if id != unfalisedBlocksID {
r.subscriptions[id].pendingReorgsToBeProcessed.Add(1)
go r.notifyReorgToSubscription(id, block.Num, lastTrackedBlock)
}
go r.notifyReorgToSubscription(id, block.Num)

// remove the reorged blocks from the tracked blocks
blocks.removeRange(block.Num, lastTrackedBlock)

break
} else if block.Num <= latestFinalisedBlock {
delete(blocks, block.Num)
}
}

// if we processed finalized or reorged blocks, update the tracked blocks in memory and db
if err := r.updateTrackedBlocksNoLock(ctx, id, blocks); err != nil {
return err
}
Expand Down Expand Up @@ -310,84 +322,92 @@ func (r *ReorgDetector) removeFinalisedBlocks(ctx context.Context) {
}
}

func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context, initBlock uint64) {
func (r *ReorgDetector) addUnfinalisedBlocks(ctx context.Context) {
var (
firstBlockReorged uint64
err error
lastUnfinalisedBlock uint64
ticker = time.NewTicker(waitPeriodBlockAdder)
unfinalisedBlocksMap = r.getUnfinalisedBlocksMap()
)
currentBlock := initBlock
lastBlockFromClient := currentBlock - 1

if len(unfinalisedBlocksMap) > 0 {
lastUnfinalisedBlock = unfinalisedBlocksMap.getSorted()[len(unfinalisedBlocksMap)-1].Num
}

for {
for currentBlock > lastBlockFromClient {
lastBlockFromClient, err = r.ethClient.BlockNumber(ctx)
select {
case <-ticker.C:
lastBlockFromClient, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(rpc.LatestBlockNumber)))
if err != nil {
log.Error("reorg detector - error getting last block number from client", "err", err)
time.Sleep(waitPeriodBlockAdder)

log.Error("reorg detector - error getting last block from client", "err", err)
continue
}

time.Sleep(waitPeriodBlockAdder)
}

header, err := r.ethClient.HeaderByNumber(ctx, big.NewInt(int64(currentBlock)))
if err != nil {
log.Error("reorg detector - error getting block header from client", "err", err)
}
if lastBlockFromClient.Number.Uint64() < lastUnfinalisedBlock {
// a reorg probably happened, and the client has less blocks than we have
// we should wait for the client to catch up so we can be sure
continue
}

unfinalisedBlocks := r.getUnfinalisedBlocksMap()
unfinalisedBlocksMap = r.getUnfinalisedBlocksMap()
if len(unfinalisedBlocksMap) == 0 {
// no unfinalised blocks, just add this block to the map
if err := r.saveTrackedBlock(ctx, unfalisedBlocksID, block{
Num: lastBlockFromClient.Number.Uint64(),
Hash: lastBlockFromClient.Hash(),
}); err != nil {
log.Error("reorg detector - error saving unfinalised block", "block", lastBlockFromClient.Number.Uint64(), "err", err)
}

prevBlock, ok := unfinalisedBlocks[currentBlock-1]
if !ok || header.ParentHash == prevBlock.Hash {
r.saveTrackedBlock(ctx, unfalisedBlocksID, block{header.Number.Uint64(), header.Hash()})
if firstBlockReorged > 0 {
r.notifyReorg(currentBlock, firstBlockReorged)
}
currentBlock++
firstBlockReorged = 0
} else if header.ParentHash != prevBlock.Hash {
// previous block is reorged:
// 1. add a pending reorg to be processed for all the subscribers (so they don't add more blocks)
// 2. remove block
for _, sub := range r.subscriptions {
sub.pendingReorgsToBeProcessed.Add(1)
continue
}

r.removeTrackedBlock(ctx, unfalisedBlocksID, currentBlock-1)
startBlock := lastBlockFromClient
unfinalisedBlocksSorted := unfinalisedBlocksMap.getSorted()
lastReorgBlock := uint64(0)

for i := startBlock.Number.Uint64(); i > unfinalisedBlocksSorted[0].Num; i-- {
previousBlock, ok := unfinalisedBlocksMap[i-1]
if !ok || previousBlock.Hash == startBlock.ParentHash {
unfinalisedBlocksMap[i] = block{Num: startBlock.Number.Uint64(), Hash: startBlock.Hash()}
} else if previousBlock.Hash != startBlock.ParentHash {
// reorg happened, we will find out from where exactly and report this to subscribers
lastReorgBlock = i
}
}

currentBlock--
if firstBlockReorged == 0 {
firstBlockReorged = currentBlock
if lastReorgBlock > 0 {
r.notifyReorgToAllSubscriptions(lastReorgBlock)
} else {
r.updateTrackedBlocks(ctx, unfalisedBlocksID, unfinalisedBlocksMap)
}
case <-ctx.Done():
return
}
}
}

func (r *ReorgDetector) notifyReorg(fromBlock, toBlock uint64) {
func (r *ReorgDetector) notifyReorgToAllSubscriptions(reorgBlock uint64) {
for id := range r.subscriptions {
go r.notifyReorgToSubscription(id, fromBlock, toBlock)
r.trackedBlocksLock.RLock()
subscriberBlocks := r.trackedBlocks[id]
r.trackedBlocksLock.RUnlock()

go r.notifyReorgToSubscription(id, subscriberBlocks.getClosestHigherBlock(reorgBlock).Num)
}
}

func (r *ReorgDetector) notifyReorgToSubscription(id string, fromBlock, toBlock uint64) {
r.trackedBlocksLock.RLock()
blocks := r.trackedBlocks[id]
r.trackedBlocksLock.RUnlock()
func (r *ReorgDetector) notifyReorgToSubscription(id string, reorgBlock uint64) {
if id == unfalisedBlocksID {
return // unfinalised blocks are not subscribers
}

sub := r.subscriptions[id]
var found bool
for i := fromBlock; i <= toBlock; i++ {
if _, ok := blocks[i]; ok {
if !found {
// notify about the first reorged block that was tracked
// and wait for the receiver to process
found = true
sub.FirstReorgedBlock <- i
<-sub.ReorgProcessed
}
delete(blocks, i)
}
}
sub.pendingReorgsToBeProcessed.Add(1)

// notify about the first reorged block that was tracked
// and wait for the receiver to process
sub.FirstReorgedBlock <- reorgBlock
<-sub.ReorgProcessed

sub.pendingReorgsToBeProcessed.Done()
}
Expand Down Expand Up @@ -467,21 +487,6 @@ func (r *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b block
return tx.Put(subscriberBlocks, []byte(id), raw)
}

// removeTrackedBlock removes the tracked block for a subscriber in db and in memory
func (r *ReorgDetector) removeTrackedBlock(ctx context.Context, id string, blockNum uint64) error {
r.trackedBlocksLock.Lock()
defer r.trackedBlocksLock.Unlock()

subscriberBlockMap, ok := r.trackedBlocks[id]
if !ok {
return nil
}

delete(subscriberBlockMap, blockNum)

return r.updateTrackedBlocksNoLock(ctx, id, subscriberBlockMap)
}

// removeTrackedBlocks removes the tracked blocks for a subscriber in db and in memory
func (r *ReorgDetector) removeTrackedBlocks(ctx context.Context, lastFinalizedBlock uint64) error {
r.trackedBlocksLock.Lock()
Expand All @@ -498,6 +503,14 @@ func (r *ReorgDetector) removeTrackedBlocks(ctx context.Context, lastFinalizedBl
return nil
}

// updateTrackedBlocks updates the tracked blocks for a subscriber in db and in memory
func (r *ReorgDetector) updateTrackedBlocks(ctx context.Context, id string, blocks blockMap) error {
r.trackedBlocksLock.Lock()
defer r.trackedBlocksLock.Unlock()

return r.updateTrackedBlocksNoLock(ctx, id, blocks)
}

// updateTrackedBlocksNoLock updates the tracked blocks for a subscriber in db and in memory
func (r *ReorgDetector) updateTrackedBlocksNoLock(ctx context.Context, id string, blocks blockMap) error {
tx, err := r.db.BeginRw(ctx)
Expand Down

0 comments on commit 94a3f4c

Please sign in to comment.