Skip to content

Commit

Permalink
latest block mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
boojamya committed Mar 14, 2024
1 parent ef0c7e8 commit 0db60af
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 50 deletions.
6 changes: 5 additions & 1 deletion cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ func Start(a *AppState) *cobra.Command {
updateLatestHeight := 1 * time.Second
go c.TrackLatestBlockHeight(cmd.Context(), logger, updateLatestHeight)

time.Sleep(5 * time.Second)
// wait until height is available
for c.LatestBlock() == 0 {
time.Sleep(1 * time.Second)
}

if err := c.InitializeBroadcaster(cmd.Context(), logger, sequenceMap); err != nil {
logger.Error("Error initializing broadcaster", "error", err)
os.Exit(1)
Expand Down
11 changes: 10 additions & 1 deletion ethereum/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,16 @@ func (e *Ethereum) Domain() types.Domain {
}

func (e *Ethereum) LatestBlock() uint64 {
return e.latestBlock
e.mu.Lock()
block := e.latestBlock
e.mu.Unlock()
return block
}

func (e *Ethereum) SetLatestBlock(block uint64) {
e.mu.Lock()
e.latestBlock = block
e.mu.Unlock()
}

func (e *Ethereum) LastFlushedBlock() uint64 {
Expand Down
52 changes: 12 additions & 40 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"cosmossdk.io/log"
ethereum "github.com/ethereum/go-ethereum"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -69,7 +68,7 @@ func (e *Ethereum) startListenerRoutines(

// query history pertaining to lookback period
if e.lookbackPeriod != 0 {
latestBlock := e.latestBlock
latestBlock := e.LatestBlock()
start := latestBlock - e.lookbackPeriod
end := latestBlock
logger.Info(fmt.Sprintf("starting lookback of %d blocks", e.lookbackPeriod))
Expand All @@ -91,10 +90,10 @@ func (e *Ethereum) startMainStream(
etherReader := etherstream.Reader{Backend: e.wsClient}

if e.startBlock == 0 {
e.startBlock = e.latestBlock
e.startBlock = e.LatestBlock()
}

latestBlock := e.latestBlock
latestBlock := e.LatestBlock()

// start initial stream (lookback period handled separately)
logger.Info(fmt.Sprintf("Starting Ethereum listener at block %d", e.startBlock))
Expand Down Expand Up @@ -253,7 +252,7 @@ func (e *Ethereum) flushMechanism(
timer := time.NewTimer(5 * time.Minute)
select {
case <-timer.C:
latestBlock := e.latestBlock
latestBlock := e.LatestBlock()

if e.lastFlushedBlock == 0 {
e.lastFlushedBlock = latestBlock
Expand All @@ -265,6 +264,8 @@ func (e *Ethereum) flushMechanism(

e.getAndConsumeHistory(ctx, logger, processingQueue, start, latestBlock)

e.lastFlushedBlock = latestBlock

logger.Info("flush complete")

// if main websocket stream is disconnected, stop flush. It will be restarted once websocket is reconnected
Expand All @@ -287,7 +288,9 @@ func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger
if err != nil {
logger.Error("Error getting lastest block height:", err)
}
e.latestBlock = header.Number.Uint64()
if err == nil {
e.SetLatestBlock(header.Number.Uint64())
}

// then start loop on a timer
for {
Expand All @@ -299,7 +302,7 @@ func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger
logger.Error("Error getting lastest block height:", err)
continue
}
e.latestBlock = header.Number.Uint64()
e.SetLatestBlock(header.Number.Uint64())

case <-ctx.Done():
timer.Stop()
Expand All @@ -312,63 +315,33 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m
logger = logger.With("metric", "wallet blance", "chain", e.name, "domain", e.domain)
queryRate := 30 * time.Second

var err error
var client *ethclient.Client

account := common.HexToAddress(e.minterAddress)

exponent := big.NewInt(int64(e.MetricsExponent)) // ex: 18
scaleFactor := new(big.Float).SetInt(new(big.Int).Exp(big.NewInt(10), exponent, nil)) // ex: 10^18

defer func() {
if client != nil {
client.Close()
}
}()

first := make(chan struct{}, 1)
first <- struct{}{}
createClient := true
for {
timer := time.NewTimer(queryRate)
select {
// don't wait the "queryRate" amount of time if this is the first time running
case <-first:
timer.Stop()
if createClient {
client, err = ethclient.DialContext(ctx, e.rpcURL)
if err != nil {
logger.Error(fmt.Sprintf("error dialing eth client. Will try again in %d sec", queryRate), "error", err)
createClient = true
continue
}
}
balance, err := client.BalanceAt(ctx, account, nil)
balance, err := e.rpcClient.BalanceAt(ctx, account, nil)
if err != nil {
logger.Error(fmt.Sprintf("error querying balance. Will try again in %d sec", queryRate), "error", err)
createClient = true
continue
}

balanceBigFloat := new(big.Float).SetInt(balance)
balanceScaled, _ := new(big.Float).Quo(balanceBigFloat, scaleFactor).Float64()

m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled)

createClient = false
case <-timer.C:
if createClient {
client, err = ethclient.DialContext(ctx, e.rpcURL)
if err != nil {
logger.Error(fmt.Sprintf("error dialing eth client. Will try again in %d sec", queryRate), "error", err)
createClient = true
continue
}
}
balance, err := client.BalanceAt(ctx, account, nil)
balance, err := e.rpcClient.BalanceAt(ctx, account, nil)
if err != nil {
logger.Error(fmt.Sprintf("error querying balance. Will try again in %d sec", queryRate), "error", err)
createClient = true
continue
}

Expand All @@ -377,7 +350,6 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m

m.SetWalletBalance(e.name, e.minterAddress, e.MetricsDenom, balanceScaled)

createClient = false
case <-ctx.Done():
timer.Stop()
return
Expand Down
11 changes: 10 additions & 1 deletion noble/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,16 @@ func (n *Noble) Domain() types.Domain {
}

func (n *Noble) LatestBlock() uint64 {
return n.latestBlock
n.mu.Lock()
block := n.latestBlock
n.mu.Unlock()
return block
}

func (n *Noble) SetLatestBlock(block uint64) {
n.mu.Lock()
n.latestBlock = block
n.mu.Unlock()
}

func (n *Noble) LastFlushedBlock() uint64 {
Expand Down
14 changes: 7 additions & 7 deletions noble/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (n *Noble) StartListener(
logger = logger.With("chain", n.Name(), "chain_id", n.chainID, "domain", n.Domain())

if n.startBlock == 0 {
n.startBlock = n.latestBlock
n.startBlock = n.LatestBlock()
}

logger.Info(fmt.Sprintf("Starting Noble listener at block %d looking back %d blocks",
Expand All @@ -35,7 +35,7 @@ func (n *Noble) StartListener(
// enqueue block heights
currentBlock := n.startBlock
lookback := n.lookbackPeriod
chainTip := n.latestBlock
chainTip := n.LatestBlock()

if n.blockQueueChannelSize == 0 {
n.blockQueueChannelSize = defaultBlockQueueChannelSize
Expand All @@ -58,15 +58,15 @@ func (n *Noble) StartListener(
select {
case <-first:
timer.Stop()
chainTip = n.latestBlock
chainTip = n.LatestBlock()
if chainTip >= currentBlock {
for i := currentBlock; i <= chainTip; i++ {
blockQueue <- i
}
currentBlock = chainTip + 1
}
case <-timer.C:
chainTip = n.latestBlock
chainTip = n.LatestBlock()
if chainTip >= currentBlock {
for i := currentBlock; i <= chainTip; i++ {
blockQueue <- i
Expand Down Expand Up @@ -126,7 +126,7 @@ func (n *Noble) flushMechanism(
timer := time.NewTimer(5 * time.Minute)
select {
case <-timer.C:
latestBlock := n.latestBlock
latestBlock := n.LatestBlock()

if n.lastFlushedBlock == 0 {
n.lastFlushedBlock = latestBlock
Expand Down Expand Up @@ -159,7 +159,7 @@ func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger, l
if err != nil {
logger.Error("unable to query Nobles latest height", "err", err)
}
n.latestBlock = uint64(res.SyncInfo.LatestBlockHeight)
n.SetLatestBlock(uint64(res.SyncInfo.LatestBlockHeight))

// then start loop on a timer
for {
Expand All @@ -171,7 +171,7 @@ func (n *Noble) TrackLatestBlockHeight(ctx context.Context, logger log.Logger, l
logger.Error("unable to query Nobles latest height", "err", err)
continue
}
n.latestBlock = uint64(res.SyncInfo.LatestBlockHeight)
n.SetLatestBlock(uint64(res.SyncInfo.LatestBlockHeight))
case <-ctx.Done():
timer.Stop()
return
Expand Down
3 changes: 3 additions & 0 deletions types/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Chain interface {
// LatestBlockain returns the last queired height of the chain
LatestBlock() uint64

// SetLatestBlock sets the latest block
SetLatestBlock(block uint64)

// LastFlushedBlock returns the last block included in a flush. In the rare situation of a crash,
// this block is a good block to start at to catch up on any missed transactions.
LastFlushedBlock() uint64
Expand Down

0 comments on commit 0db60af

Please sign in to comment.