Skip to content

Commit

Permalink
Integrate reorg detector to localbridgesync
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaubennassar committed Jul 3, 2024
1 parent 2386219 commit 7caed89
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 98 deletions.
107 changes: 14 additions & 93 deletions localbridgesync/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,26 @@ package localbridgesync
import (
"context"
"errors"
"fmt"
"time"

"github.com/0xPolygon/cdk/reorgdetector"
)

const (
checkReorgInterval = time.Second * 10
downloadBufferSize = 100
reorgDetectorID = "localbridgesync"
)

type driver struct {
p *processor
d *downloader
reorgDetector *reorgdetector.ReorgDetector
reorgSub *reorgdetector.Subscription
p *processor
d *downloader
}

func newDriver() (*driver, error) {
func newDriver(r *reorgdetector.ReorgDetector) (*driver, error) {
r.Subscribe(reorgDetectorID)
return nil, errors.New("not implemented")
}

Expand All @@ -35,25 +40,21 @@ func (d *driver) Sync(ctx context.Context) {
downloadCh := make(chan block, downloadBufferSize)
go d.d.download(cancellableCtx, lastProcessedBlock, downloadCh)

// detect potential reorgs
reorgCh := make(chan uint64)
go d.detectReorg(cancellableCtx, reorgCh)

for {
if shouldRestartSync := d.syncIteration(cancel, downloadCh, reorgCh); shouldRestartSync {
if shouldRestartSync := d.syncIteration(ctx, cancel, downloadCh); shouldRestartSync {
break
}
}
}
}

func (d *driver) syncIteration(
cancel context.CancelFunc, downloadCh chan block, reorgCh chan uint64,
ctx context.Context, cancel context.CancelFunc, downloadCh chan block,
) (shouldRestartSync bool) {
shouldRestartSync = false
select {
case b := <-downloadCh: // new block from downloader
err := d.storeBlockToReorgTracker(b.blockHeader)
err := d.reorgDetector.AddBlockToTrack(ctx, reorgDetectorID, b.Num, b.Hash)
if err != nil {
// TODO: handle error
return
Expand All @@ -63,7 +64,7 @@ func (d *driver) syncIteration(
// TODO: handle error
return
}
case lastValidBlock := <-reorgCh: // reorg detected
case lastValidBlock := <-d.reorgSub.FirstReorgedBlock: // reorg detected
// stop downloader
cancel()
// wait until downloader closes channel
Expand All @@ -77,90 +78,10 @@ func (d *driver) syncIteration(
// TODO: handle error
return
}
d.reorgSub.ReorgProcessed <- true

// restart syncing
shouldRestartSync = true
}
return
}

// IMO we could make a package "reorg detector" that could be reused by all the syncers
// each instance should use it's own storage / bucket

func (d *driver) detectReorg(ctx context.Context, reorgCh chan uint64) {
var (
expectedHeader blockHeader
err error
)
for {
expectedHeader, err = d.getGreatestBlockFromReorgTracker()
if err != nil {
// TODO: handle error
return
}
actualHeader, err := d.d.getBlockHeader(ctx, expectedHeader.Num)
if err != nil {
// TODO: handle error
return
}
if actualHeader.Hash != expectedHeader.Hash {
fmt.Printf("reorg detected")
break
}
time.Sleep(checkReorgInterval)
}
// Find last valid block
oldestTrackedBlock, err := d.getSmallestBlockFromReorgTracker()
if err != nil {
// TODO: handle error
return
}
lastValidBlock := oldestTrackedBlock.Num
for i := expectedHeader.Num - 1; i > lastValidBlock; i-- {
expectedHeader, err = d.getBlockFromReorgTracker(i)
if err != nil {
// TODO: handle error
return
}
actualHeader, err := d.d.getBlockHeader(ctx, expectedHeader.Num)
if err != nil {
// TODO: handle error
return
}
if actualHeader.Hash == expectedHeader.Hash {
fmt.Printf("valid block detected")
reorgCh <- actualHeader.Num
return
}
}
// this should never happen! But if it happens it should delete ALL the data from the processor
// or have logic to handle this "special case"
fmt.Printf("no valid block detected!!!!")
reorgCh <- oldestTrackedBlock.Num
return
}

func (d *driver) storeBlockToReorgTracker(b blockHeader) error {
fmt.Printf("adding block %d with hash %s to the reorg tracker storage\n", b.Num, b.Hash)
return errors.New("not implemented")
}

func (d *driver) removeBlockFromReorgTracker(blockNum uint64) error {
fmt.Printf("removing block %d from the reorg tracker storage\n", blockNum)
return errors.New("not implemented")
}

func (d *driver) getGreatestBlockFromReorgTracker() (blockHeader, error) {
fmt.Println("getting the block with the greatest block num from the reorg tracker storage")
return blockHeader{}, errors.New("not implemented")
}

func (d *driver) getBlockFromReorgTracker(blockNum uint64) (blockHeader, error) {
fmt.Printf("getting the block %d from the reorg tracker storage", blockNum)
return blockHeader{}, errors.New("not implemented")
}

func (d *driver) getSmallestBlockFromReorgTracker() (blockHeader, error) {
fmt.Println("getting the block with the smallest block num from the reorg tracker storage")
return blockHeader{}, errors.New("not implemented")
}
10 changes: 5 additions & 5 deletions reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
ErrInavlidBlockHash = errors.New("the block hash does not match with the expected block hash")
)

type subscription struct {
type Subscription struct {
FirstReorgedBlock chan uint64
ReorgProcessed chan bool
mu sync.Mutex
Expand All @@ -38,7 +38,7 @@ type ReorgDetector struct {
unfinalisedBlocks map[uint64]common.Hash
trackedBlocks map[string]map[uint64]common.Hash // TODO: needs persistance! needs to be able to iterate in order!
// the channel is used to notify first invalid block
subscriptions map[string]*subscription
subscriptions map[string]*Subscription
}

func New(ctx context.Context) (*ReorgDetector, error) {
Expand All @@ -63,11 +63,11 @@ func (r *ReorgDetector) Start(ctx context.Context) {
go r.addUnfinalisedBlocks(ctx, lastFinalisedBlock+1)
}

func (r *ReorgDetector) Subscribe(id string) *subscription {
func (r *ReorgDetector) Subscribe(id string) *Subscription {
if sub, ok := r.subscriptions[id]; ok {
return sub
}
sub := &subscription{
sub := &Subscription{
FirstReorgedBlock: make(chan uint64),
ReorgProcessed: make(chan bool),
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func (r *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, blockNum

func (r *ReorgDetector) cleanStoredSubsBeforeStart(ctx context.Context, latestFinalisedBlock uint64) error {
for id := range r.trackedBlocks {
sub := &subscription{
sub := &Subscription{
FirstReorgedBlock: make(chan uint64),
ReorgProcessed: make(chan bool),
}
Expand Down

0 comments on commit 7caed89

Please sign in to comment.