Skip to content

Commit

Permalink
use services.Config.NewService/Engine (#13851)
Browse files Browse the repository at this point in the history
* use services.Config.NewService/Engine

* feedback
  • Loading branch information
jmank88 authored Aug 6, 2024
1 parent 2312827 commit 69f7bd6
Show file tree
Hide file tree
Showing 25 changed files with 608 additions and 906 deletions.
70 changes: 26 additions & 44 deletions common/headtracker/head_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,12 @@ type HeadBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] interf
}

type headBroadcaster[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct {
services.StateMachine
logger logger.Logger
services.Service
eng *services.Engine

callbacks callbackSet[H, BLOCK_HASH]
mailbox *mailbox.Mailbox[H]
mutex sync.Mutex
chClose services.StopChan
wgDone sync.WaitGroup
latest H
lastCallbackID int
}
Expand All @@ -60,41 +59,29 @@ func NewHeadBroadcaster[
](
lggr logger.Logger,
) HeadBroadcaster[H, BLOCK_HASH] {
return &headBroadcaster[H, BLOCK_HASH]{
logger: logger.Named(lggr, "HeadBroadcaster"),
hb := &headBroadcaster[H, BLOCK_HASH]{
callbacks: make(callbackSet[H, BLOCK_HASH]),
mailbox: mailbox.NewSingle[H](),
chClose: make(chan struct{}),
}
hb.Service, hb.eng = services.Config{
Name: "HeadBroadcaster",
Start: hb.start,
Close: hb.close,
}.NewServiceEngine(lggr)
return hb
}

func (hb *headBroadcaster[H, BLOCK_HASH]) Start(context.Context) error {
return hb.StartOnce("HeadBroadcaster", func() error {
hb.wgDone.Add(1)
go hb.run()
return nil
})
}

func (hb *headBroadcaster[H, BLOCK_HASH]) Close() error {
return hb.StopOnce("HeadBroadcaster", func() error {
hb.mutex.Lock()
// clear all callbacks
hb.callbacks = make(callbackSet[H, BLOCK_HASH])
hb.mutex.Unlock()

close(hb.chClose)
hb.wgDone.Wait()
return nil
})
func (hb *headBroadcaster[H, BLOCK_HASH]) start(context.Context) error {
hb.eng.Go(hb.run)
return nil
}

func (hb *headBroadcaster[H, BLOCK_HASH]) Name() string {
return hb.logger.Name()
}

func (hb *headBroadcaster[H, BLOCK_HASH]) HealthReport() map[string]error {
return map[string]error{hb.Name(): hb.Healthy()}
func (hb *headBroadcaster[H, BLOCK_HASH]) close() error {
hb.mutex.Lock()
// clear all callbacks
hb.callbacks = make(callbackSet[H, BLOCK_HASH])
hb.mutex.Unlock()
return nil
}

func (hb *headBroadcaster[H, BLOCK_HASH]) BroadcastNewLongestChain(head H) {
Expand All @@ -121,26 +108,24 @@ func (hb *headBroadcaster[H, BLOCK_HASH]) Subscribe(callback HeadTrackable[H, BL
return
}

func (hb *headBroadcaster[H, BLOCK_HASH]) run() {
defer hb.wgDone.Done()

func (hb *headBroadcaster[H, BLOCK_HASH]) run(ctx context.Context) {
for {
select {
case <-hb.chClose:
case <-ctx.Done():
return
case <-hb.mailbox.Notify():
hb.executeCallbacks()
hb.executeCallbacks(ctx)
}
}
}

// DEV: the head relayer makes no promises about head delivery! Subscribing
// Jobs should expect to the relayer to skip heads if there is a large number of listeners
// and all callbacks cannot be completed in the allotted time.
func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks() {
func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks(ctx context.Context) {
head, exists := hb.mailbox.Retrieve()
if !exists {
hb.logger.Info("No head to retrieve. It might have been skipped")
hb.eng.Info("No head to retrieve. It might have been skipped")
return
}

Expand All @@ -149,17 +134,14 @@ func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks() {
hb.latest = head
hb.mutex.Unlock()

hb.logger.Debugw("Initiating callbacks",
hb.eng.Debugw("Initiating callbacks",
"headNum", head.BlockNumber(),
"numCallbacks", len(callbacks),
)

wg := sync.WaitGroup{}
wg.Add(len(callbacks))

ctx, cancel := hb.chClose.NewCtx()
defer cancel()

for _, callback := range callbacks {
go func(trackable HeadTrackable[H, BLOCK_HASH]) {
defer wg.Done()
Expand All @@ -168,7 +150,7 @@ func (hb *headBroadcaster[H, BLOCK_HASH]) executeCallbacks() {
defer cancel()
trackable.OnNewLongestChain(cctx, head)
elapsed := time.Since(start)
hb.logger.Debugw(fmt.Sprintf("Finished callback in %s", elapsed),
hb.eng.Debugw(fmt.Sprintf("Finished callback in %s", elapsed),
"callbackType", reflect.TypeOf(trackable), "blockNumber", head.BlockNumber(), "time", elapsed)
}(callback)
}
Expand Down
73 changes: 41 additions & 32 deletions common/headtracker/head_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ var (
}, []string{"ChainID"})
)

// headHandler is a callback that handles incoming heads
type headHandler[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] func(ctx context.Context, header H) error
// HeadHandler is a callback that handles incoming heads
type HeadHandler[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] func(ctx context.Context, header H) error

// HeadListener is a chain agnostic interface that manages connection of Client that receives heads from the blockchain node
type HeadListener[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] interface {
// ListenForNewHeads kicks off the listen loop (not thread safe)
// done() must be executed upon leaving ListenForNewHeads()
ListenForNewHeads(onSubscribe func(), handleNewHead headHandler[H, BLOCK_HASH], done func())
services.Service

// ListenForNewHeads runs the listen loop (not thread safe)
ListenForNewHeads(ctx context.Context)

// ReceivingHeads returns true if the listener is receiving heads (thread safe)
ReceivingHeads() bool
Expand All @@ -54,10 +55,13 @@ type headListener[
ID types.ID,
BLOCK_HASH types.Hashable,
] struct {
services.Service
eng *services.Engine

config htrktypes.Config
client htrktypes.Client[HTH, S, ID, BLOCK_HASH]
logger logger.Logger
chStop services.StopChan
onSubscription func(context.Context)
handleNewHead HeadHandler[HTH, BLOCK_HASH]
chHeaders chan HTH
headSubscription types.Subscription
connected atomic.Bool
Expand All @@ -74,38 +78,43 @@ func NewHeadListener[
lggr logger.Logger,
client CLIENT,
config htrktypes.Config,
chStop chan struct{},
onSubscription func(context.Context),
handleNewHead HeadHandler[HTH, BLOCK_HASH],
) HeadListener[HTH, BLOCK_HASH] {
return &headListener[HTH, S, ID, BLOCK_HASH]{
config: config,
client: client,
logger: logger.Named(lggr, "HeadListener"),
chStop: chStop,
hl := &headListener[HTH, S, ID, BLOCK_HASH]{
config: config,
client: client,
onSubscription: onSubscription,
handleNewHead: handleNewHead,
}
hl.Service, hl.eng = services.Config{
Name: "HeadListener",
Start: hl.start,
}.NewServiceEngine(lggr)
return hl
}

func (hl *headListener[HTH, S, ID, BLOCK_HASH]) Name() string {
return hl.logger.Name()
func (hl *headListener[HTH, S, ID, BLOCK_HASH]) start(context.Context) error {
hl.eng.Go(hl.ListenForNewHeads)
return nil
}

func (hl *headListener[HTH, S, ID, BLOCK_HASH]) ListenForNewHeads(onSubscription func(), handleNewHead headHandler[HTH, BLOCK_HASH], done func()) {
defer done()
func (hl *headListener[HTH, S, ID, BLOCK_HASH]) ListenForNewHeads(ctx context.Context) {
defer hl.unsubscribe()

ctx, cancel := hl.chStop.NewCtx()
defer cancel()

for {
if !hl.subscribe(ctx) {
break
}

onSubscription()
err := hl.receiveHeaders(ctx, handleNewHead)
if hl.onSubscription != nil {
hl.onSubscription(ctx)
}
err := hl.receiveHeaders(ctx, hl.handleNewHead)
if ctx.Err() != nil {
break
} else if err != nil {
hl.logger.Errorw("Error in new head subscription, unsubscribed", "err", err)
hl.eng.Errorw("Error in new head subscription, unsubscribed", "err", err)
continue
}
break
Expand All @@ -131,7 +140,7 @@ func (hl *headListener[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error
return map[string]error{hl.Name(): err}
}

func (hl *headListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Context, handleNewHead headHandler[HTH, BLOCK_HASH]) error {
func (hl *headListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Context, handleNewHead HeadHandler[HTH, BLOCK_HASH]) error {
var noHeadsAlarmC <-chan time.Time
var noHeadsAlarmT *time.Ticker
noHeadsAlarmDuration := hl.config.BlockEmissionIdleWarningThreshold()
Expand All @@ -142,7 +151,7 @@ func (hl *headListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Conte

for {
select {
case <-hl.chStop:
case <-ctx.Done():
return nil

case blockHeader, open := <-hl.chHeaders:
Expand All @@ -158,13 +167,13 @@ func (hl *headListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Conte
return errors.New("head listener: chHeaders prematurely closed")
}
if !blockHeader.IsValid() {
hl.logger.Error("got nil block header")
hl.eng.Error("got nil block header")
continue
}

// Compare the chain ID of the block header to the chain ID of the client
if !blockHeader.HasChainID() || blockHeader.ChainID().String() != chainId.String() {
hl.logger.Panicf("head listener for %s received block header for %s", chainId, blockHeader.ChainID())
hl.eng.Panicf("head listener for %s received block header for %s", chainId, blockHeader.ChainID())
}
promNumHeadsReceived.WithLabelValues(chainId.String()).Inc()

Expand All @@ -184,7 +193,7 @@ func (hl *headListener[HTH, S, ID, BLOCK_HASH]) receiveHeaders(ctx context.Conte

case <-noHeadsAlarmC:
// We haven't received a head on the channel for a long time, log a warning
hl.logger.Warnf("have not received a head for %v", noHeadsAlarmDuration)
hl.eng.Warnf("have not received a head for %v", noHeadsAlarmDuration)
hl.receivingHeads.Store(false)
}
}
Expand All @@ -198,19 +207,19 @@ func (hl *headListener[HTH, S, ID, BLOCK_HASH]) subscribe(ctx context.Context) b
for {
hl.unsubscribe()

hl.logger.Debugf("Subscribing to new heads on chain %s", chainId.String())
hl.eng.Debugf("Subscribing to new heads on chain %s", chainId.String())

select {
case <-hl.chStop:
case <-ctx.Done():
return false

case <-time.After(subscribeRetryBackoff.Duration()):
err := hl.subscribeToHead(ctx)
if err != nil {
promEthConnectionErrors.WithLabelValues(chainId.String()).Inc()
hl.logger.Warnw("Failed to subscribe to heads on chain", "chainID", chainId.String(), "err", err)
hl.eng.Warnw("Failed to subscribe to heads on chain", "chainID", chainId.String(), "err", err)
} else {
hl.logger.Debugf("Subscribed to heads on chain %s", chainId.String())
hl.eng.Debugf("Subscribed to heads on chain %s", chainId.String())
return true
}
}
Expand Down
Loading

0 comments on commit 69f7bd6

Please sign in to comment.