Skip to content

Commit

Permalink
chains: rename package headtracker to heads; remove redundant prefixe…
Browse files Browse the repository at this point in the history
…s; remove types package
  • Loading branch information
jmank88 committed Jan 30, 2025
1 parent 466bfbb commit adbfed7
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 243 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package headtracker
package heads

import (
"context"
Expand All @@ -16,32 +16,32 @@ import (

const TrackableCallbackTimeout = 2 * time.Second

type callbackSet[H chains.Head[BLOCK_HASH], BLOCK_HASH chains.Hashable] map[int]HeadTrackable[H, BLOCK_HASH]
type callbackSet[H chains.Head[BLOCK_HASH], BLOCK_HASH chains.Hashable] map[int]Trackable[H, BLOCK_HASH]

func (set callbackSet[H, BLOCK_HASH]) values() []HeadTrackable[H, BLOCK_HASH] {
var values []HeadTrackable[H, BLOCK_HASH]
func (set callbackSet[H, BLOCK_HASH]) values() []Trackable[H, BLOCK_HASH] {
var values []Trackable[H, BLOCK_HASH]
for _, callback := range set {
values = append(values, callback)
}
return values
}

// HeadTrackable is implemented by the core txm to be able to receive head events from any chain.
// Trackable is implemented by the core txm to be able to receive head events from any chain.
// Chain implementations should notify head events to the core txm via this interface.
type HeadTrackable[H chains.Head[BLOCK_HASH], BLOCK_HASH chains.Hashable] interface {
type Trackable[H chains.Head[BLOCK_HASH], BLOCK_HASH chains.Hashable] interface {
// OnNewLongestChain sends a new head when it becomes available. Subscribers can recursively trace the parent
// of the head to the finalized block back.
OnNewLongestChain(ctx context.Context, head H)
}

// HeadBroadcaster relays new Heads to all subscribers.
type HeadBroadcaster[H chains.Head[BLOCK_HASH], BLOCK_HASH chains.Hashable] interface {
// Broadcaster relays new Heads to all subscribers.
type Broadcaster[H chains.Head[BLOCK_HASH], BLOCK_HASH chains.Hashable] interface {
services.Service
BroadcastNewLongestChain(H)
Subscribe(callback HeadTrackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func())
Subscribe(callback Trackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func())
}

type headBroadcaster[H chains.Head[BLOCK_HASH], BLOCK_HASH chains.Hashable] struct {
type broadcaster[H chains.Head[BLOCK_HASH], BLOCK_HASH chains.Hashable] struct {
services.Service
eng *services.Engine

Expand All @@ -52,14 +52,14 @@ type headBroadcaster[H chains.Head[BLOCK_HASH], BLOCK_HASH chains.Hashable] stru
lastCallbackID int
}

// NewHeadBroadcaster creates a new HeadBroadcaster
func NewHeadBroadcaster[
// NewBroadcaster creates a new Broadcaster
func NewBroadcaster[
H chains.Head[BLOCK_HASH],
BLOCK_HASH chains.Hashable,
](
lggr logger.Logger,
) HeadBroadcaster[H, BLOCK_HASH] {
hb := &headBroadcaster[H, BLOCK_HASH]{
) Broadcaster[H, BLOCK_HASH] {
hb := &broadcaster[H, BLOCK_HASH]{
callbacks: make(callbackSet[H, BLOCK_HASH]),
mailbox: mailbox.NewSingle[H](),
}
Expand All @@ -71,70 +71,70 @@ func NewHeadBroadcaster[
return hb
}

func (hb *headBroadcaster[H, BLOCK_HASH]) start(context.Context) error {
hb.eng.Go(hb.run)
func (b *broadcaster[H, BLOCK_HASH]) start(context.Context) error {
b.eng.Go(b.run)
return nil
}

func (hb *headBroadcaster[H, BLOCK_HASH]) close() error {
hb.mutex.Lock()
func (b *broadcaster[H, BLOCK_HASH]) close() error {
b.mutex.Lock()
// clear all callbacks
hb.callbacks = make(callbackSet[H, BLOCK_HASH])
hb.mutex.Unlock()
b.callbacks = make(callbackSet[H, BLOCK_HASH])
b.mutex.Unlock()
return nil
}

func (hb *headBroadcaster[H, BLOCK_HASH]) BroadcastNewLongestChain(head H) {
hb.mailbox.Deliver(head)
func (b *broadcaster[H, BLOCK_HASH]) BroadcastNewLongestChain(head H) {
b.mailbox.Deliver(head)
}

// Subscribe subscribes to OnNewLongestChain and Connect until HeadBroadcaster is closed,
// Subscribe subscribes to OnNewLongestChain and Connect until Broadcaster is closed,
// or unsubscribe callback is called explicitly
func (hb *headBroadcaster[H, BLOCK_HASH]) Subscribe(callback HeadTrackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func()) {
hb.mutex.Lock()
defer hb.mutex.Unlock()
func (b *broadcaster[H, BLOCK_HASH]) Subscribe(callback Trackable[H, BLOCK_HASH]) (currentLongestChain H, unsubscribe func()) {
b.mutex.Lock()
defer b.mutex.Unlock()

currentLongestChain = hb.latest
currentLongestChain = b.latest

hb.lastCallbackID++
callbackID := hb.lastCallbackID
hb.callbacks[callbackID] = callback
b.lastCallbackID++
callbackID := b.lastCallbackID
b.callbacks[callbackID] = callback
unsubscribe = func() {
hb.mutex.Lock()
defer hb.mutex.Unlock()
delete(hb.callbacks, callbackID)
b.mutex.Lock()
defer b.mutex.Unlock()
delete(b.callbacks, callbackID)
}

return
}

func (hb *headBroadcaster[H, BLOCK_HASH]) run(ctx context.Context) {
func (b *broadcaster[H, BLOCK_HASH]) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-hb.mailbox.Notify():
hb.executeCallbacks(ctx)
case <-b.mailbox.Notify():
b.executeCallbacks(ctx)
}
}
}

// DEV: the head relayer makes no promises about head delivery! Subscribing
// Jobs should expect to the relayer to skip heads if there is a large number of listeners
// and all callbacks cannot be completed in the allotted time.
func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks(ctx context.Context) {
head, exists := hb.mailbox.Retrieve()
func (b *broadcaster[H, BLOCK_HASH]) executeCallbacks(ctx context.Context) {
head, exists := b.mailbox.Retrieve()
if !exists {
hb.eng.Info("No head to retrieve. It might have been skipped")
b.eng.Info("No head to retrieve. It might have been skipped")
return
}

hb.mutex.Lock()
callbacks := hb.callbacks.values()
hb.latest = head
hb.mutex.Unlock()
b.mutex.Lock()
callbacks := b.callbacks.values()
b.latest = head
b.mutex.Unlock()

hb.eng.Debugw("Initiating callbacks",
b.eng.Debugw("Initiating callbacks",
"headNum", head.BlockNumber(),
"numCallbacks", len(callbacks),
)
Expand All @@ -143,14 +143,14 @@ func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks(ctx context.Context)
wg.Add(len(callbacks))

for _, callback := range callbacks {
go func(trackable HeadTrackable[H, BLOCK_HASH]) {
go func(trackable Trackable[H, BLOCK_HASH]) {
defer wg.Done()
start := time.Now()
cctx, cancel := context.WithTimeout(ctx, TrackableCallbackTimeout)
defer cancel()
trackable.OnNewLongestChain(cctx, head)
elapsed := time.Since(start)
hb.eng.Debugw(fmt.Sprintf("Finished callback in %s", elapsed),
b.eng.Debugw(fmt.Sprintf("Finished callback in %s", elapsed),
"callbackType", reflect.TypeOf(trackable), "blockNumber", head.BlockNumber(), "time", elapsed)
}(callback)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package types
package heads

import (
"context"
Expand Down
6 changes: 3 additions & 3 deletions chains/headtracker/types/head.go → chains/heads/head.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package types
package heads

import (
"github.com/smartcontractkit/chainlink-framework/chains"
)

type Head[BLOCK_HASH chains.Hashable, CHAIN_ID chains.ID] interface {
chains.Head[BLOCK_HASH]
// ChainID returns the chain ID that the head is for
// ChainID returns the chain ID of the head.
ChainID() CHAIN_ID
// Returns true if the head has a chain Id
// HasChainID returns true if the head has a chain ID.
HasChainID() bool
// IsValid returns true if the head is valid.
IsValid() bool
Expand Down
Loading

0 comments on commit adbfed7

Please sign in to comment.