Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracker v2 #180

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 48 additions & 127 deletions blocktracker/blocktracker.go → block-tracker/blocktracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ const (

// BlockTracker is an interface to track new blocks on the chain
type BlockTracker struct {
config *Config
blocks []*ethgo.Block
blocksLock sync.Mutex
subscriber BlockTrackerInterface
blockChs []chan *BlockEvent
blockChsLock sync.Mutex
provider BlockProvider
once sync.Once
closeCh chan struct{}
config *Config

blocks []*ethgo.Block
lock sync.Mutex
tracker BlockTrackerInterface
provider BlockProvider

eventBroker *EventBroker
closeCh chan struct{}
}

type Config struct {
Expand Down Expand Up @@ -69,85 +69,43 @@ func NewBlockTracker(provider BlockProvider, opts ...ConfigOption) *BlockTracker
if tracker == nil {
tracker = NewJSONBlockTracker(log.New(os.Stderr, "", log.LstdFlags), provider)
}
return &BlockTracker{
blocks: []*ethgo.Block{},
blockChs: []chan *BlockEvent{},
config: config,
subscriber: tracker,
provider: provider,
closeCh: make(chan struct{}),
}
}

func (b *BlockTracker) Subscribe() chan *BlockEvent {
b.blockChsLock.Lock()
defer b.blockChsLock.Unlock()

ch := make(chan *BlockEvent, 1)
b.blockChs = append(b.blockChs, ch)
return ch
}

func (b *BlockTracker) AcquireLock() Lock {
return Lock{lock: &b.blocksLock}
}

func (t *BlockTracker) Init() (err error) {
var block *ethgo.Block
t.once.Do(func() {
block, err = t.provider.GetBlockByNumber(ethgo.Latest, false)
if err != nil {
return
}
if block.Number == 0 {
return
}

blocks := make([]*ethgo.Block, t.config.MaxBlockBacklog)

var i uint64
for i = 0; i < t.config.MaxBlockBacklog; i++ {
blocks[t.config.MaxBlockBacklog-i-1] = block
if block.Number == 0 {
break
}
block, err = t.provider.GetBlockByHash(block.ParentHash, false)
if err != nil {
return
}
}
broker, err := NewEventBroker(context.Background(), EventBrokerCfg{})
if err != nil {
panic(err)
}

if i != t.config.MaxBlockBacklog {
// less than maxBacklog elements
blocks = blocks[t.config.MaxBlockBacklog-i-1:]
}
t.blocks = blocks
})
return err
}
initial, err := provider.GetBlockByNumber(ethgo.Latest, false)
if err != nil {
panic(err)
}

func (b *BlockTracker) MaxBlockBacklog() uint64 {
return b.config.MaxBlockBacklog
}
b := &BlockTracker{
blocks: []*ethgo.Block{},
config: config,
tracker: tracker,
provider: provider,
eventBroker: broker,
closeCh: make(chan struct{}),
}

func (b *BlockTracker) LastBlocked() *ethgo.Block {
target := b.blocks[len(b.blocks)-1]
if target == nil {
return nil
// add an initial block
if err := b.HandleReconcile(initial); err != nil {
panic(err)
}
return target.Copy()
return b
}

func (b *BlockTracker) BlocksBlocked() []*ethgo.Block {
res := []*ethgo.Block{}
for _, i := range b.blocks {
res = append(res, i.Copy())
}
return res
// Header returns the last block of the tracked chain
func (b *BlockTracker) Header() *ethgo.Block {
b.lock.Lock()
last := b.blocks[len(b.blocks)-1].Copy()
b.lock.Unlock()
return last
}

func (b *BlockTracker) Len() int {
return len(b.blocks)
func (b *BlockTracker) Subscribe() *Subscription {
return b.eventBroker.Subscribe()
}

func (b *BlockTracker) Close() error {
Expand All @@ -162,7 +120,7 @@ func (b *BlockTracker) Start() error {
cancelFn()
}()
// start the polling
err := b.subscriber.Track(ctx, func(block *ethgo.Block) error {
err := b.tracker.Track(ctx, func(block *ethgo.Block) error {
return b.HandleReconcile(block)
})
if err != nil {
Expand All @@ -171,7 +129,7 @@ func (b *BlockTracker) Start() error {
return err
}

func (t *BlockTracker) AddBlockLocked(block *ethgo.Block) error {
func (t *BlockTracker) AddBlocks(block *ethgo.Block) error {
if uint64(len(t.blocks)) == t.config.MaxBlockBacklog {
// remove past blocks if there are more than maxReconcileBlocks
t.blocks = t.blocks[1:]
Expand Down Expand Up @@ -225,7 +183,7 @@ func (t *BlockTracker) handleReconcileImpl(block *ethgo.Block) ([]*ethgo.Block,
count := uint64(0)
for {
if count > t.config.MaxBlockBacklog {
return nil, -1, fmt.Errorf("cannot reconcile more than max backlog values")
return nil, -1, fmt.Errorf("cannot reconcile more than '%d' max backlog values", t.config.MaxBlockBacklog)
}
count++

Expand All @@ -250,8 +208,8 @@ func (t *BlockTracker) handleReconcileImpl(block *ethgo.Block) ([]*ethgo.Block,
}

func (t *BlockTracker) HandleBlockEvent(block *ethgo.Block) (*BlockEvent, error) {
t.blocksLock.Lock()
defer t.blocksLock.Unlock()
t.lock.Lock()
defer t.lock.Unlock()

blocks, indx, err := t.handleReconcileImpl(block)
if err != nil {
Expand All @@ -274,7 +232,7 @@ func (t *BlockTracker) HandleBlockEvent(block *ethgo.Block) (*BlockEvent, error)
// include the new blocks
for _, block := range blocks {
blockEvnt.Added = append(blockEvnt.Added, block)
if err := t.AddBlockLocked(block); err != nil {
if err := t.AddBlocks(block); err != nil {
return nil, err
}
}
Expand All @@ -290,15 +248,7 @@ func (t *BlockTracker) HandleReconcile(block *ethgo.Block) error {
return nil
}

t.blockChsLock.Lock()
for _, ch := range t.blockChs {
select {
case ch <- blockEvnt:
default:
}
}
t.blockChsLock.Unlock()

t.eventBroker.Publish(blockEvnt)
return nil
}

Expand Down Expand Up @@ -409,41 +359,12 @@ func (s *SubscriptionBlockTracker) Track(ctx context.Context, handle func(block
return nil
}

type Lock struct {
Locked bool
lock *sync.Mutex
}

func (l *Lock) Lock() {
l.Locked = true
l.lock.Lock()
}

func (l *Lock) Unlock() {
l.Locked = false
l.lock.Unlock()
}

// EventType is the type of the event
type EventType int

const (
// EventAdd happens when a new event is included in the chain
EventAdd EventType = iota
// EventDel may happen when there is a reorg and a past event is deleted
EventDel
)

// Event is an event emitted when a new log is included
type Event struct {
Type EventType
Added []*ethgo.Log
Removed []*ethgo.Log
}

// BlockEvent is an event emitted when a new block is included
type BlockEvent struct {
Type EventType
Added []*ethgo.Block
Removed []*ethgo.Block
}

func (b *BlockEvent) Header() *ethgo.Block {
return b.Added[len(b.Added)-1]
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package blocktracker

/*
import (
"context"
"log"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/umbracle/ethgo"
"github.com/umbracle/ethgo/jsonrpc"
"github.com/umbracle/ethgo/testutil"
web3 "github.com/umbracle/go-web3"
"github.com/umbracle/go-web3/jsonrpc"
"github.com/umbracle/go-web3/testutil"
)

func testListener(t *testing.T, server *testutil.TestServer, tracker BlockTrackerInterface) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

blocks := make(chan *ethgo.Block)
err := tracker.Track(ctx, func(block *ethgo.Block) error {
blocks := make(chan *web3.Block)
err := tracker.Track(ctx, func(block *web3.Block) error {
blocks <- block
return nil
})
Expand Down Expand Up @@ -83,11 +83,10 @@ func TestBlockTracker_Lifecycle(t *testing.T) {

c, _ := jsonrpc.NewClient(s.HTTPAddr())
tr := NewBlockTracker(c.Eth())
assert.NoError(t, tr.Init())

go tr.Start()

sub := tr.Subscribe()
sub := tr.Subscribe().GetEventCh()
for i := 0; i < 10; i++ {
select {
case <-sub:
Expand All @@ -97,46 +96,6 @@ func TestBlockTracker_Lifecycle(t *testing.T) {
}
}

func TestBlockTracker_PopulateBlocks(t *testing.T) {
// more than maxBackLog blocks
{
l := testutil.MockList{}
l.Create(0, 15, func(b *testutil.MockBlock) {})

m := &testutil.MockClient{}
m.AddScenario(l)

tt0 := NewBlockTracker(m)

err := tt0.Init()
if err != nil {
t.Fatal(err)
}
if !testutil.CompareBlocks(l.ToBlocks()[5:], tt0.blocks) {
t.Fatal("bad")
}
}
// less than maxBackLog
{
l0 := testutil.MockList{}
l0.Create(0, 5, func(b *testutil.MockBlock) {})

m1 := &testutil.MockClient{}
m1.AddScenario(l0)

tt1 := NewBlockTracker(m1)
tt1.provider = m1

err := tt1.Init()
if err != nil {
panic(err)
}
if !testutil.CompareBlocks(l0.ToBlocks(), tt1.blocks) {
t.Fatal("bad")
}
}
}

func TestBlockTracker_Events(t *testing.T) {

type TestEvent struct {
Expand Down Expand Up @@ -336,10 +295,10 @@ func TestBlockTracker_Events(t *testing.T) {

// build past block history
for _, b := range c.History.ToBlocks() {
tt.AddBlockLocked(b)
tt.addBlocks(b)
}

sub := tt.Subscribe()
sub := tt.Subscribe().GetEventCh()
for _, b := range c.Reconcile {
if err := tt.HandleReconcile(b.block.Block()); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -370,3 +329,4 @@ func TestBlockTracker_Events(t *testing.T) {
})
}
}
*/
Loading