Skip to content

Commit

Permalink
processingQueue fix, flush flag, spelling
Browse files Browse the repository at this point in the history
  • Loading branch information
boojamya committed Mar 18, 2024
1 parent 869103b commit 2107822
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 25 deletions.
12 changes: 7 additions & 5 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ import (
)

const (
flagConfigPath = "config"
flagVerbose = "verbose"
flagLogLevel = "log-level"
flagJSON = "json"
flagMetricsPort = "metrics-port"
flagConfigPath = "config"
flagVerbose = "verbose"
flagLogLevel = "log-level"
flagJSON = "json"
flagMetricsPort = "metrics-port"
flagFlushInterval = "flush-interval"
)

func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command {
cmd.PersistentFlags().StringVar(&a.ConfigPath, flagConfigPath, defaultConfigPath, "file path of config file")
cmd.PersistentFlags().BoolVarP(&a.Debug, flagVerbose, "v", false, fmt.Sprintf("use this flag to set log level to `debug` (overrides %s flag)", flagLogLevel))
cmd.PersistentFlags().StringVar(&a.LogLevel, flagLogLevel, "info", "log level (debug, info, warn, error)")
cmd.PersistentFlags().Int16P(flagMetricsPort, "p", 2112, "customize Prometheus metrics port")
cmd.PersistentFlags().DurationP(flagFlushInterval, "i", 0, "how frequently should a flush routine be run")
return cmd

}
Expand Down
10 changes: 9 additions & 1 deletion cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func Start(a *AppState) *cobra.Command {
os.Exit(1)
}

flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval)
if err != nil {
logger.Error("invalid flush interval", "error", err)
}
if flushInterval == 0 {
logger.Info("flush interval not set. Use the --flush-interval flag to set a reoccurring flush")
}

metrics := relayer.InitPromMetrics(port)

for name, cfg := range cfg.Chains {
Expand Down Expand Up @@ -83,7 +91,7 @@ func Start(a *AppState) *cobra.Command {
os.Exit(1)
}

go c.StartListener(cmd.Context(), logger, processingQueue)
go c.StartListener(cmd.Context(), logger, processingQueue, flushInterval)
go c.WalletBalanceMetric(cmd.Context(), a.Logger, metrics)

if _, ok := registeredDomains[c.Domain()]; ok {
Expand Down
31 changes: 18 additions & 13 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var (
messageSent abi.Event
messageTransmitterAddress common.Address
processingQueue chan *types.TxState
flushInterval time.Duration
)

// errSignal allows broadcasting an error value to multiple receivers.
Expand All @@ -38,10 +39,12 @@ func (e *Ethereum) StartListener(
ctx context.Context,
logger log.Logger,
processingQueue_ chan *types.TxState,
flushInterval_ time.Duration,
) {
logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain)

processingQueue = processingQueue_
flushInterval = flushInterval_

messageTransmitter, err := content.ReadFile("abi/MessageTransmitter.json")
if err != nil {
Expand All @@ -57,17 +60,16 @@ func (e *Ethereum) StartListener(
messageSent = messageTransmitterABI.Events["MessageSent"]
messageTransmitterAddress = common.HexToAddress(e.messageTransmitterAddress)

e.startListenerRoutines(ctx, logger, processingQueue)
e.startListenerRoutines(ctx, logger)
}

// startListenerRoutines starts the ethereum websocket subscription, queries history pertaining to the lookback period,
// and starts the reoccurring flush
//
// If an error occurs in websocket strea, this function will handle relevant sub routines and then re-run iteself.
// If an error occurs in websocket stream, this function will handle relevant sub routines and then re-run itself.
func (e *Ethereum) startListenerRoutines(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
) {

sig := &errSignal{
Expand All @@ -78,7 +80,7 @@ func (e *Ethereum) startListenerRoutines(
stream, sub, history := e.startMainStream(ctx, logger)

go e.consumeStream(ctx, logger, stream, sig)
consumeHistroy(logger, history)
consumeHistory(logger, history)

// get history from start-lookback up until latest block
latestBlock := e.LatestBlock()
Expand All @@ -92,7 +94,9 @@ func (e *Ethereum) startListenerRoutines(

logger.Info("finished getting history")

go e.flushMechanism(ctx, logger, sig)
if flushInterval > 0 {
go e.flushMechanism(ctx, logger, sig)
}

// listen for errors in the main websocket stream
// if error occurs, trigger sig.Ready
Expand All @@ -107,7 +111,7 @@ func (e *Ethereum) startListenerRoutines(
// restart
e.startBlock = e.lastFlushedBlock
time.Sleep(10 * time.Millisecond)
e.startListenerRoutines(ctx, logger, processingQueue)
e.startListenerRoutines(ctx, logger)
return
}

Expand Down Expand Up @@ -196,16 +200,16 @@ func (e *Ethereum) getAndConsumeHistory(
break
}
toUnSub.Unsubscribe()
consumeHistroy(logger, history)
consumeHistory(logger, history)

start += chunkSize
chunk++
}
}

// consumeHistroy consumes the hisroty from a QueryWithHistory() go-ethereum call.
// consumeHistory consumes the history from a QueryWithHistory() go-ethereum call.
// it passes messages to the processingQueue
func consumeHistroy(
func consumeHistory(
logger log.Logger,
history []ethtypes.Log,
) {
Expand Down Expand Up @@ -268,9 +272,10 @@ func (e *Ethereum) flushMechanism(
logger log.Logger,
sig *errSignal,
) {
logger.Debug(fmt.Sprintf("flush mechanism started. Will flush every %v", flushInterval))

for {
timer := time.NewTimer(5 * time.Minute)
timer := time.NewTimer(flushInterval)
select {
case <-timer.C:
latestBlock := e.LatestBlock()
Expand Down Expand Up @@ -307,7 +312,7 @@ func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger
// first time
header, err := e.rpcClient.HeaderByNumber(ctx, nil)
if err != nil {
logger.Error(fmt.Sprintf("error getting lastest block height. Will retry in %.2f second:", loop.Seconds()), "err", err)
logger.Error(fmt.Sprintf("error getting latest block height. Will retry in %.2f second:", loop.Seconds()), "err", err)
}
if err == nil {
e.SetLatestBlock(header.Number.Uint64())
Expand All @@ -320,7 +325,7 @@ func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger
case <-timer.C:
header, err := e.rpcClient.HeaderByNumber(ctx, nil)
if err != nil {
logger.Debug(fmt.Sprintf("error getting lastest block height. Will retry in %.2f second:", loop.Seconds()), "err", err)
logger.Debug(fmt.Sprintf("error getting latest block height. Will retry in %.2f second:", loop.Seconds()), "err", err)
continue
}
e.SetLatestBlock(header.Number.Uint64())
Expand All @@ -333,7 +338,7 @@ func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger
}

func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m *relayer.PromMetrics) {
logger = logger.With("metric", "wallet blance", "chain", e.name, "domain", e.domain)
logger = logger.With("metric", "wallet blannce", "chain", e.name, "domain", e.domain)
queryRate := 30 * time.Second

account := common.HexToAddress(e.minterAddress)
Expand Down
2 changes: 1 addition & 1 deletion ethereum/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestStartListener(t *testing.T) {

processingQueue := make(chan *types.TxState, 10000)

go eth.StartListener(ctx, a.Logger, processingQueue)
go eth.StartListener(ctx, a.Logger, processingQueue, 0)

time.Sleep(5 * time.Second)

Expand Down
2 changes: 1 addition & 1 deletion integration/eth_burn_to_noble_mint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestEthBurnToNobleMint(t *testing.T) {

processingQueue := make(chan *types.TxState, 10)

go ethChain.StartListener(ctx, a.Logger, processingQueue)
go ethChain.StartListener(ctx, a.Logger, processingQueue, 0)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap)

_, _, generatedWallet := testdata.KeyTestPubAddr()
Expand Down
2 changes: 1 addition & 1 deletion integration/noble_burn_to_eth_mint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestNobleBurnToEthMint(t *testing.T) {

processingQueue := make(chan *types.TxState, 10)

go nobleChain.StartListener(ctx, a.Logger, processingQueue)
go nobleChain.StartListener(ctx, a.Logger, processingQueue, 0)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap)

ethDestinationAddress, _, err := generateEthWallet()
Expand Down
23 changes: 21 additions & 2 deletions noble/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"github.com/strangelove-ventures/noble-cctp-relayer/types"
)

var flushInterval time.Duration

func (n *Noble) StartListener(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
flushInterval_ time.Duration,
) {
logger = logger.With("chain", n.Name(), "chain_id", n.chainID, "domain", n.Domain())

Expand Down Expand Up @@ -112,7 +115,9 @@ func (n *Noble) StartListener(
}()
}

go n.flushMechanism(ctx, logger, blockQueue)
if flushInterval > 0 {
go n.flushMechanism(ctx, logger, blockQueue)
}

<-ctx.Done()
}
Expand All @@ -122,12 +127,26 @@ func (n *Noble) flushMechanism(
logger log.Logger,
blockQueue chan uint64,
) {

logger.Debug(fmt.Sprintf("flush mechanism started. Will flush every %v", flushInterval))

for {
timer := time.NewTimer(5 * time.Minute)
timer := time.NewTimer(flushInterval)
select {
case <-timer.C:
latestBlock := n.LatestBlock()

// test to see that the rpc is available before attempting flush
res, err := n.cc.RPCClient.Status(ctx)
if err != nil {
logger.Error(fmt.Sprintf("skipping flush... error reaching out to rpc, will retry flush in %v", flushInterval))
continue
}
if res.SyncInfo.CatchingUp {
logger.Error(fmt.Sprintf("skipping flush... rpc still catching, will retry flush in %v", flushInterval))
continue
}

if n.lastFlushedBlock == 0 {
n.lastFlushedBlock = latestBlock
}
Expand Down
2 changes: 1 addition & 1 deletion noble/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestStartListener(t *testing.T) {

processingQueue := make(chan *types.TxState, 10000)

go n.StartListener(ctx, a.Logger, processingQueue)
go n.StartListener(ctx, a.Logger, processingQueue, 0)

time.Sleep(20 * time.Second)

Expand Down
1 change: 1 addition & 0 deletions types/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Chain interface {
ctx context.Context,
logger log.Logger,
processingQueue chan *TxState,
flushInterval time.Duration,
)

// Broadcast broadcasts CCTP mint messages to the chain.
Expand Down

0 comments on commit 2107822

Please sign in to comment.