Skip to content

Commit

Permalink
error sync
Browse files Browse the repository at this point in the history
  • Loading branch information
boojamya committed Mar 14, 2024
1 parent 5fed395 commit 23d84ff
Showing 1 changed file with 40 additions and 11 deletions.
51 changes: 40 additions & 11 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"os"
"sync"
"time"

"cosmossdk.io/log"
Expand All @@ -25,6 +26,29 @@ var (
messageTransmitterAddress common.Address
)

type errorSync struct {
sync.Mutex
trigger chan struct{}
err error
next *errorSync
}

func NewErrorSync() *errorSync {
return &errorSync{
trigger: make(chan struct{}),
}
}

func (es *errorSync) SetError(err error) {
es.Lock()
defer es.Unlock()
if es.err != nil {
return
}
es.err = err
close(es.trigger)
}

func (e *Ethereum) StartListener(
ctx context.Context,
logger log.Logger,
Expand Down Expand Up @@ -52,9 +76,6 @@ func (e *Ethereum) StartListener(

// startListenerRoutines starts the ethereum websocket subscription, queries history pertaining to the lookback period,
// and starts the reoccurring flush
//
// we pass the subscription from the initial queryEth() to flushMechanism() so in the case the
// websocket becomes disconnect, we stop and then re-start the flush.
func (e *Ethereum) startListenerRoutines(
ctx context.Context,
logger log.Logger,
Expand All @@ -63,7 +84,15 @@ func (e *Ethereum) startListenerRoutines(

// start main stream (does not account for lookback period or specific start block)
stream, sub, history := e.startMainStream(ctx, logger)
go e.consumeStream(ctx, logger, processingQueue, stream, sub)
ethSubErrorSync := NewErrorSync()

go func() {
err := <-sub.Err()
logger.Error("websocket disconnected. Will re-connect", "err", err)
ethSubErrorSync.SetError(err)
}()

go e.consumeStream(ctx, logger, processingQueue, stream, ethSubErrorSync)
consumeHistroy(logger, history, processingQueue)

// get history for start block and lookback period
Expand All @@ -78,7 +107,7 @@ func (e *Ethereum) startListenerRoutines(
logger.Info("finished getting history")

// start flush timer
go e.flushMechanism(ctx, logger, processingQueue, sub)
go e.flushMechanism(ctx, logger, processingQueue, ethSubErrorSync)
}

func (e *Ethereum) startMainStream(
Expand Down Expand Up @@ -198,19 +227,19 @@ func (e *Ethereum) consumeStream(
logger log.Logger,
processingQueue chan *types.TxState,
stream <-chan ethtypes.Log,
sub ethereum.Subscription,
errSync *errorSync,
) {
var txState *types.TxState
for {
select {
case <-ctx.Done():
return
case err := <-sub.Err():
case <-errSync.trigger:
// setting start block to 0 will start listener from latsest height.
// in the rare case we are waiting for the websocket to come back on line for a long period of time,
// we'll rely on the latestFlushBlock to flush out all missted transactions
e.startBlock = 0
logger.Error("connection closed. Restarting...", "err", err)
logger.Error("stream routine stopped")
e.startListenerRoutines(ctx, logger, processingQueue)
return
case streamLog := <-stream:
Expand Down Expand Up @@ -242,7 +271,7 @@ func (e *Ethereum) flushMechanism(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
sub ethereum.Subscription,
errSync *errorSync,
) {

for {
Expand All @@ -266,9 +295,9 @@ func (e *Ethereum) flushMechanism(
logger.Info("flush complete")

// if main websocket stream is disconnected, stop flush. It will be restarted once websocket is reconnected
case <-sub.Err():
case <-errSync.trigger:
timer.Stop()
logger.Info("websocket disconnected, stopping flush mechanism. Will restart after websocket is re-established")
logger.Info("flush stopped. Will restart after websocket is re-established")
return
case <-ctx.Done():
timer.Stop()
Expand Down

0 comments on commit 23d84ff

Please sign in to comment.