Skip to content

Commit

Permalink
fix start logic
Browse files Browse the repository at this point in the history
  • Loading branch information
boojamya committed Mar 14, 2024
1 parent 0db60af commit 5fed395
Showing 1 changed file with 13 additions and 16 deletions.
29 changes: 13 additions & 16 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,24 @@ func (e *Ethereum) startListenerRoutines(
processingQueue chan *types.TxState,
) {

// start main stream (does not account for lookback period)
// start main stream (does not account for lookback period or specific start block)
stream, sub, history := e.startMainStream(ctx, logger)
go e.consumeStream(ctx, logger, processingQueue, stream, sub)
consumeHistroy(logger, history, processingQueue)

// query history pertaining to lookback period
if e.lookbackPeriod != 0 {
latestBlock := e.LatestBlock()
start := latestBlock - e.lookbackPeriod
end := latestBlock
logger.Info(fmt.Sprintf("starting lookback of %d blocks", e.lookbackPeriod))
e.getAndConsumeHistory(ctx, logger, processingQueue, start, end)
logger.Info(fmt.Sprintf("finished lookback of %d blocks", e.lookbackPeriod))
// get history for start block and lookback period
latestBlock := e.LatestBlock()
start := latestBlock
if e.startBlock != 0 {
start = e.startBlock
}
start -= e.lookbackPeriod
logger.Info(fmt.Sprintf("getting history: start block:%d looking back %d blocks", e.startBlock, e.lookbackPeriod))
e.getAndConsumeHistory(ctx, logger, processingQueue, start, latestBlock)
logger.Info("finished getting history")

// start flush timer
go e.flushMechanism(ctx, logger, processingQueue, sub)

}

func (e *Ethereum) startMainStream(
Expand All @@ -89,14 +90,10 @@ func (e *Ethereum) startMainStream(

etherReader := etherstream.Reader{Backend: e.wsClient}

if e.startBlock == 0 {
e.startBlock = e.LatestBlock()
}

latestBlock := e.LatestBlock()

// start initial stream (lookback period handled separately)
logger.Info(fmt.Sprintf("Starting Ethereum listener at block %d", e.startBlock))
// start initial stream (start-block and lookback period handled separately)
logger.Info("Starting Ethereum listener")

query := ethereum.FilterQuery{
Addresses: []common.Address{messageTransmitterAddress},
Expand Down

0 comments on commit 5fed395

Please sign in to comment.