Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flush only mode #91

Merged
merged 6 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add flush only mode
  • Loading branch information
joelsmith-2019 committed May 6, 2024
commit 9f4430fd07002b9b7fd0074af085e1b61f05999a
2 changes: 2 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
flagJSON = "json"
flagMetricsPort = "metrics-port"
flagFlushInterval = "flush-interval"
flagFlushOnlyMode = "flush-only-mode"
)

func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command {
Expand All @@ -21,6 +22,7 @@ func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command {
cmd.PersistentFlags().StringVar(&a.LogLevel, flagLogLevel, "info", "log level (debug, info, warn, error)")
cmd.PersistentFlags().Int16P(flagMetricsPort, "p", 2112, "customize Prometheus metrics port")
cmd.PersistentFlags().DurationP(flagFlushInterval, "i", 0, "how frequently should a flush routine be run")
cmd.PersistentFlags().BoolP(flagFlushOnlyMode, "f", false, "only run the background flush routine (acts as a redundant relayer)")
return cmd
}

Expand Down
32 changes: 23 additions & 9 deletions cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ func Start(a *AppState) *cobra.Command {
logger := a.Logger
cfg := a.Config

flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval)
if err != nil {
logger.Error("Invalid flush interval", "error", err)
jtieri marked this conversation as resolved.
Show resolved Hide resolved
}

flushOnly, err := cmd.Flags().GetBool(flagFlushOnlyMode)
if err != nil {
logger.Error("Invalid flush only flag", "error", err)
os.Exit(1)
joelsmith-2019 marked this conversation as resolved.
Show resolved Hide resolved
}

if flushInterval == 0 {
if flushOnly {
logger.Error("Flush only mode requires a flush interval")
os.Exit(1)
joelsmith-2019 marked this conversation as resolved.
Show resolved Hide resolved
} else {
logger.Info("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush")
}
}

// start API on normal relayer only
go startAPI(a)

// messageState processing queue
Expand All @@ -55,14 +76,6 @@ func Start(a *AppState) *cobra.Command {
os.Exit(1)
}

flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval)
if err != nil {
logger.Error("Invalid flush interval", "error", err)
}
if flushInterval == 0 {
logger.Info("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush")
}

metrics := relayer.InitPromMetrics(port)

for name, cfg := range cfg.Chains {
Expand Down Expand Up @@ -100,7 +113,8 @@ func Start(a *AppState) *cobra.Command {
os.Exit(1)
}

go c.StartListener(cmd.Context(), logger, processingQueue, flushInterval)
go c.StartListener(cmd.Context(), logger, processingQueue, flushOnly, flushInterval)

go c.WalletBalanceMetric(cmd.Context(), a.Logger, metrics)

if _, ok := registeredDomains[c.Domain()]; ok {
Expand Down
83 changes: 49 additions & 34 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (e *Ethereum) StartListener(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
flushOnlyMode bool,
flushInterval time.Duration,
) {
logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain)
Expand All @@ -55,43 +56,49 @@ func (e *Ethereum) StartListener(
Ready: make(chan struct{}),
}

// start main stream (does not account for lookback period or specific start block)
stream, sub, history := e.startMainStream(ctx, logger, messageSent, messageTransmitterAddress)
// FlushOnlyMode should only run the flush mechanism, otherwise start the main listener,
// otherwise consume history and incoming msgs
joelsmith-2019 marked this conversation as resolved.
Show resolved Hide resolved
if flushOnlyMode {
go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushOnlyMode, flushInterval, sig)
} else {
// start main stream (does not account for lookback period or specific start block)
stream, sub, history := e.startMainStream(ctx, logger, messageSent, messageTransmitterAddress)

go e.consumeStream(ctx, logger, processingQueue, messageSent, messageTransmitterABI, stream, sig)
consumeHistory(logger, history, processingQueue, messageSent, messageTransmitterABI)
go e.consumeStream(ctx, logger, processingQueue, messageSent, messageTransmitterABI, stream, sig)
consumeHistory(logger, history, processingQueue, messageSent, messageTransmitterABI)

// get history from (start block - lookback) up until latest block
latestBlock := e.LatestBlock()
start := latestBlock
if e.startBlock != 0 {
start = e.startBlock
}
startLookback := start - e.lookbackPeriod
logger.Info(fmt.Sprintf("Getting history from %d: starting at: %d looking back %d blocks", startLookback, start, e.lookbackPeriod))
e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startLookback, latestBlock)
// get history from (start block - lookback) up until latest block
latestBlock := e.LatestBlock()
start := latestBlock
if e.startBlock != 0 {
start = e.startBlock
}
startLookback := start - e.lookbackPeriod

logger.Info("Finished getting history")
logger.Info(fmt.Sprintf("Getting history from %d: starting at: %d looking back %d blocks", startLookback, start, e.lookbackPeriod))
e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startLookback, latestBlock)
logger.Info("Finished getting history")

if flushInterval > 0 {
go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushInterval, sig)
}
if flushInterval > 0 {
go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushOnlyMode, flushInterval, sig)
}

// listen for errors in the main websocket stream
// if error occurs, trigger sig.Ready
// This will cancel `consumeStream` and `flushMechanism` routines
select {
case <-ctx.Done():
return
case err := <-sub.Err():
logger.Error("Websocket disconnected. Reconnecting...", "err", err)
close(sig.Ready)

// restart
e.startBlock = e.lastFlushedBlock
time.Sleep(10 * time.Millisecond)
e.StartListener(ctx, logger, processingQueue, flushInterval)
return
// listen for errors in the main websocket stream
// if error occurs, trigger sig.Ready
// This will cancel `consumeStream` and `flushMechanism` routines
select {
case <-ctx.Done():
return
case err := <-sub.Err():
logger.Error("Websocket disconnected. Reconnecting...", "err", err)
close(sig.Ready)

// restart
e.startBlock = e.lastFlushedBlock
time.Sleep(10 * time.Millisecond)
e.StartListener(ctx, logger, processingQueue, flushOnlyMode, flushInterval)
return
}
}
}

Expand Down Expand Up @@ -278,11 +285,19 @@ func (e *Ethereum) flushMechanism(
messageSent abi.Event,
messageTransmitterAddress common.Address,
messageTransmitterABI abi.ABI,
flushOnlyMode bool,
flushInterval time.Duration,
sig *errSignal,
) {
logger.Info(fmt.Sprintf("Starting flush mechanism. Will flush every %v", flushInterval))

// extraFlushBlocks is used to add an extra space between latest height and last flushed block
// this setting should only be used for the secondary, flush only relayer
extraFlushBlocks := uint64(0)
if flushOnlyMode {
extraFlushBlocks = 2 * e.lookbackPeriod
}

for {
timer := time.NewTimer(flushInterval)
select {
Expand All @@ -291,7 +306,7 @@ func (e *Ethereum) flushMechanism(

// initialize first lastFlushedBlock if not set
if e.lastFlushedBlock == 0 {
e.lastFlushedBlock = latestBlock - 2*e.lookbackPeriod
e.lastFlushedBlock = latestBlock - (2*e.lookbackPeriod + extraFlushBlocks)

if latestBlock < e.lookbackPeriod {
e.lastFlushedBlock = 0
Expand All @@ -302,7 +317,7 @@ func (e *Ethereum) flushMechanism(
startBlock := e.lastFlushedBlock

// set finish block to be latestBlock - lookbackPeriod
finishBlock := latestBlock - e.lookbackPeriod
finishBlock := latestBlock - (e.lookbackPeriod + extraFlushBlocks)

if startBlock >= finishBlock {
logger.Debug("No new blocks to flush")
Expand Down
2 changes: 1 addition & 1 deletion ethereum/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestStartListener(t *testing.T) {

processingQueue := make(chan *types.TxState, 10000)

go eth.StartListener(ctx, a.Logger, processingQueue, 0)
go eth.StartListener(ctx, a.Logger, processingQueue, false, 0)

time.Sleep(5 * time.Second)

Expand Down
2 changes: 1 addition & 1 deletion integration/eth_burn_to_noble_mint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestEthBurnToNobleMint(t *testing.T) {

processingQueue := make(chan *types.TxState, 10)

go ethChain.StartListener(ctx, a.Logger, processingQueue, 0)
go ethChain.StartListener(ctx, a.Logger, processingQueue, false, 0)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil)

_, _, generatedWallet := testdata.KeyTestPubAddr()
Expand Down
2 changes: 1 addition & 1 deletion integration/noble_burn_to_eth_mint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestNobleBurnToEthMint(t *testing.T) {

processingQueue := make(chan *types.TxState, 10)

go nobleChain.StartListener(ctx, a.Logger, processingQueue, 0)
go nobleChain.StartListener(ctx, a.Logger, processingQueue, false, 0)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil)

ethDestinationAddress, _, err := generateEthWallet()
Expand Down
77 changes: 44 additions & 33 deletions noble/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func (n *Noble) StartListener(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
flushOnlyMode bool,
flushInterval_ time.Duration,
) {
logger = logger.With("chain", n.Name(), "chain_id", n.chainID, "domain", n.Domain())
Expand Down Expand Up @@ -48,40 +49,42 @@ func (n *Noble) StartListener(
}
blockQueue := make(chan uint64, n.blockQueueChannelSize)

// history
currentBlock -= lookback
for currentBlock <= chainTip {
blockQueue <- currentBlock
currentBlock++
}
if !flushOnlyMode {
// history
currentBlock -= lookback
for currentBlock <= chainTip {
blockQueue <- currentBlock
currentBlock++
}

// listen for new blocks
go func() {
// inner function to queue blocks
queueBlocks := func() {
chainTip = n.LatestBlock()
if chainTip >= currentBlock {
for i := currentBlock; i <= chainTip; i++ {
blockQueue <- i
// listen for new blocks
go func() {
// inner function to queue blocks
queueBlocks := func() {
chainTip = n.LatestBlock()
if chainTip >= currentBlock {
for i := currentBlock; i <= chainTip; i++ {
blockQueue <- i
}
currentBlock = chainTip + 1
}
currentBlock = chainTip + 1
}
}

// initial queue
queueBlocks()

for {
timer := time.NewTimer(6 * time.Second)
select {
case <-timer.C:
queueBlocks()
case <-ctx.Done():
timer.Stop()
return
// initial queue
queueBlocks()

for {
timer := time.NewTimer(6 * time.Second)
select {
case <-timer.C:
queueBlocks()
case <-ctx.Done():
timer.Stop()
return
}
}
}
}()
}()
}

// constantly query for blocks
for i := 0; i < int(n.workers); i++ {
Expand Down Expand Up @@ -115,7 +118,7 @@ func (n *Noble) StartListener(
}

if flushInterval > 0 {
go n.flushMechanism(ctx, logger, blockQueue)
go n.flushMechanism(ctx, logger, blockQueue, flushOnlyMode)
}

<-ctx.Done()
Expand All @@ -135,8 +138,16 @@ func (n *Noble) flushMechanism(
ctx context.Context,
logger log.Logger,
blockQueue chan uint64,
flushOnlyMode bool,
) {
logger.Debug(fmt.Sprintf("Flush mechanism started. Will flush every %v", flushInterval))
logger.Info(fmt.Sprintf("Starting flush mechanism. Will flush every %v", flushInterval))

// extraFlushBlocks is used to add an extra space between latest height and last flushed block
// this setting should only be used for the secondary, flush only relayer
extraFlushBlocks := uint64(0)
if flushOnlyMode {
extraFlushBlocks = 2 * n.lookbackPeriod
}

for {
timer := time.NewTimer(flushInterval)
Expand All @@ -157,7 +168,7 @@ func (n *Noble) flushMechanism(

// initialize first lastFlushedBlock if not set
if n.lastFlushedBlock == 0 {
n.lastFlushedBlock = latestBlock - (2 * n.lookbackPeriod)
n.lastFlushedBlock = latestBlock - (2*n.lookbackPeriod + extraFlushBlocks)

if latestBlock < n.lookbackPeriod {
n.lastFlushedBlock = 0
Expand All @@ -168,7 +179,7 @@ func (n *Noble) flushMechanism(
startBlock := n.lastFlushedBlock

// set finish block to be latestBlock - lookbackPeriod
finishBlock := latestBlock - n.lookbackPeriod
finishBlock := latestBlock - (n.lookbackPeriod + extraFlushBlocks)

if startBlock >= finishBlock {
logger.Debug("No new blocks to flush")
Expand Down
2 changes: 1 addition & 1 deletion noble/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestStartListener(t *testing.T) {

processingQueue := make(chan *types.TxState, 10000)

go n.StartListener(ctx, a.Logger, processingQueue, 0)
go n.StartListener(ctx, a.Logger, processingQueue, false, 0)

time.Sleep(20 * time.Second)

Expand Down
1 change: 1 addition & 0 deletions types/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Chain interface {
ctx context.Context,
logger log.Logger,
processingQueue chan *TxState,
flushOnlyMode bool,
flushInterval time.Duration,
)

Expand Down