Skip to content

Commit

Permalink
refactor: split into indexer.go
Browse files Browse the repository at this point in the history
  • Loading branch information
bendanzhentan committed Dec 13, 2023
1 parent ae8819a commit 91c81f4
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 102 deletions.
106 changes: 4 additions & 102 deletions cmd/bot/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,17 @@ import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"time"

"github.com/ethereum-optimism/optimism/indexer/config"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

func RunCommand(ctx *cli.Context) error {
Expand Down Expand Up @@ -64,7 +60,10 @@ func RunCommand(ctx *cli.Context) error {
return err
}

go WatchBotDelegatedWithdrawals(ctx.Context, logger, db, l2Client, l2ScannedBlock, cfg)
go func() {
indexer := core.NewIndexer(logger, db, l2Client, cfg)
indexer.Start(ctx.Context, l2ScannedBlock)
}()
go ProcessBotDelegatedWithdrawals(ctx.Context, logger, db, l1Client, l2Client, cfg)

<-ctx.Context.Done()
Expand Down Expand Up @@ -214,103 +213,6 @@ func ProcessUnfinalizedBotDelegatedWithdrawals(ctx context.Context, log log.Logg
}
}

// storeLogs stores the logs in the database
func storeLogs(db *gorm.DB, client *core.ClientExt, logs []types.Log) error {
// save all the logs in this range of blocks
for _, vLog := range logs {
header, err := client.HeaderByHash(context.Background(), vLog.BlockHash)
if err != nil {
return err
}

event := core.WithdrawalInitiatedLog{
TransactionHash: vLog.TxHash.Hex(),
LogIndex: int(vLog.Index),
InitiatedBlockNumber: int64(header.Number.Uint64()),
}

deduped := db.Clauses(
clause.OnConflict{DoNothing: true},
)
result := deduped.Create(&event)
if result.Error != nil {
return result.Error
}
}

return nil
}

// WatchBotDelegatedWithdrawals watches for new bot-delegated withdrawals and stores them in the database.
func WatchBotDelegatedWithdrawals(ctx context.Context, log log.Logger, db *gorm.DB, client *core.ClientExt, l2StartingBlock *core.L2ScannedBlock, cfg core.Config) {
timer := time.NewTimer(0)
fromBlockNumber := big.NewInt(l2StartingBlock.Number)

for {
select {
case <-ctx.Done():
return
case <-timer.C:
timer.Reset(time.Second)
}

toBlockNumber := new(big.Int).Add(fromBlockNumber, big.NewInt(cfg.L2StandardBridgeBot.LogFilterBlockRange))
finalizedHeader, err := client.GetHeaderByTag(context.Background(), "finalized")
if err != nil {
log.Error("call eth_blockNumber", "error", err)
continue
}
if toBlockNumber.Uint64() > finalizedHeader.Number.Uint64() {
toBlockNumber = finalizedHeader.Number
}

if fromBlockNumber.Uint64() > toBlockNumber.Uint64() {
timer.Reset(5 * time.Second)
continue
}

log.Info("Fetching logs from blocks", "fromBlock", fromBlockNumber, "toBlock", toBlockNumber)
logs, err := getLogs(client, fromBlockNumber, toBlockNumber, common.HexToAddress(cfg.L2StandardBridgeBot.ContractAddress), core.WithdrawToEventSig())
if err != nil {
log.Error("eth_getLogs", "error", err)
continue
}

if len(logs) != 0 {
for _, vlog := range logs {
log.Info("fetched bot-delegated withdrawal", "blockNumber", vlog.BlockNumber, "transactionHash", vlog.TxHash.Hex())
}

err = storeLogs(db, client, logs)
if err != nil {
log.Error("storeLogs", "error", err)
continue
}
}

l2StartingBlock.Number = toBlockNumber.Int64()
result := db.Save(l2StartingBlock)
if result.Error != nil {
log.Error("update l2_scanned_blocks", "error", result.Error)
}

fromBlockNumber = new(big.Int).Add(toBlockNumber, big.NewInt(1))
}
}

// getLogs returns the logs for a given contract address and block range
func getLogs(client *core.ClientExt, fromBlock *big.Int, toBlock *big.Int, contractAddress common.Address, eventSig common.Hash) ([]types.Log, error) {
query := ethereum.FilterQuery{
FromBlock: fromBlock,
ToBlock: toBlock,
Addresses: []common.Address{
contractAddress,
},
Topics: [][]common.Hash{[]common.Hash{eventSig}},
}
return client.FilterLogs(context.Background(), query)
}

// connect connects to the database
func connect(log log.Logger, dbConfig config.DBConfig) (*gorm.DB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8&parseTime=True&loc=Local", dbConfig.User, dbConfig.Password, dbConfig.Host, dbConfig.Port, dbConfig.Name)
Expand Down
233 changes: 233 additions & 0 deletions core/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package core

import (
bindings2 "bnbchain/opbnb-bridge-bot/bindings"
"context"
"math/big"
"time"

"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type Indexer struct {
log log.Logger
db *gorm.DB
l2Client *ClientExt
cfg Config

contracts []common.Address

// Note: feeVaults indicates the configured list of FeeVault contracts:
// - [SequencerFeeVault.sol](https://github.com/bnb-chain/opbnb/blob/develop/packages/contracts-bedrock/contracts/L2/SequencerFeeVault.sol)
// - [BaseFeeVault.sol](https://github.com/bnb-chain/opbnb/blob/develop/packages/contracts-bedrock/contracts/L2/BaseFeeVault.sol)
// - [L1FeeVault.sol](https://github.com/bnb-chain/opbnb/blob/develop/packages/contracts-bedrock/contracts/L2/L1FeeVault.sol)
isFeeVaultWithdrawEvent func(vlog *types.Log) bool
isL2StandardBridgeWithdrawalInitiatedLog func(vlog *types.Log) bool
isL2StandardBridgeBotWithdrawToEvent func(vlog *types.Log) bool
}

func NewIndexer(log log.Logger, db *gorm.DB, l2Client *ClientExt, cfg Config) *Indexer {
l2StandardBridgeBots := make(map[common.Address]struct{})
feeVaults := make(map[common.Address]struct{})
contracts := make([]common.Address, 0)
for _, addr := range cfg.L2StandardBridgeBot.ContractAddresses {
addr_ := common.HexToAddress(addr)
l2StandardBridgeBots[addr_] = struct{}{}
contracts = append(contracts, addr_)
}
for _, addr := range cfg.FeeVault.ContractAddresses {
addr_ := common.HexToAddress(addr)
feeVaults[addr_] = struct{}{}
contracts = append(contracts, addr_)
}

isL2StandardBridgeWithdrawalInitiatedLog := func(vlog *types.Log) bool {
var (
L2StandardBridgeAbi, _ = bindings.L2StandardBridgeMetaData.GetAbi()
L2StandardBridgeWithdrawalInitiatedSig = L2StandardBridgeAbi.Events["WithdrawalInitiated"].ID
)
return len(vlog.Topics) > 1 && vlog.Topics[0] == L2StandardBridgeWithdrawalInitiatedSig
}
isL2StandardBridgeBotWithdrawToEvent := func(log *types.Log) bool {
var (
L2StandardBridgeBotAbi, _ = bindings2.L2StandardBridgeBotMetaData.GetAbi()
L2StandardBridgeBotWithdrawToSig = L2StandardBridgeBotAbi.Events["WithdrawTo"].ID
)
if len(log.Topics) > 0 && log.Topics[0] == L2StandardBridgeBotWithdrawToSig {
_, ok := l2StandardBridgeBots[log.Address]
return ok
}
return false
}
isFeeVaultWithdrawEvent := func(log *types.Log) bool {
var (
FeeVaultAbi, _ = bindings.L1FeeVaultMetaData.GetAbi()
FeeVaultWithdrawalSig = FeeVaultAbi.Events["Withdrawal"].ID
)
if len(log.Topics) > 0 && log.Topics[0] == FeeVaultWithdrawalSig {
_, ok := feeVaults[log.Address]
return ok
}
return false
}

return &Indexer{
log: log,
db: db,
l2Client: l2Client,
cfg: cfg,
isL2StandardBridgeWithdrawalInitiatedLog: isL2StandardBridgeWithdrawalInitiatedLog,
isL2StandardBridgeBotWithdrawToEvent: isL2StandardBridgeBotWithdrawToEvent,
isFeeVaultWithdrawEvent: isFeeVaultWithdrawEvent,
contracts: contracts,
}
}

// Start watches for new bot-delegated withdrawals and stores them in the database.
func (i *Indexer) Start(ctx context.Context, l2ScannedBlock *L2ScannedBlock) {
timer := time.NewTimer(0)
fromBlockNumber := big.NewInt(l2ScannedBlock.Number)

for {
select {
case <-ctx.Done():
return
case <-timer.C:
timer.Reset(time.Second)
}

toBlockNumber := new(big.Int).Add(fromBlockNumber, big.NewInt(i.cfg.L2StandardBridgeBot.LogFilterBlockRange))
finalizedHeader, err := i.l2Client.GetHeaderByTag(context.Background(), "finalized")
if err != nil {
log.Error("call eth_blockNumber", "error", err)
continue
}
if toBlockNumber.Uint64() > finalizedHeader.Number.Uint64() {
toBlockNumber = finalizedHeader.Number
}

if fromBlockNumber.Uint64() > toBlockNumber.Uint64() {
timer.Reset(5 * time.Second)
continue
}

logs, err := i.getWithdrawalInitiatedLogs(ctx, fromBlockNumber, toBlockNumber)
if err != nil {
log.Error("eth_getLogs", "error", err)
continue
}

if len(logs) != 0 {
for _, vlog := range logs {
log.Info("fetched bot-delegated withdrawal", "blockNumber", vlog.BlockNumber, "transactionHash", vlog.TxHash.Hex())
}

err = i.storeLogs(logs)
if err != nil {
log.Error("storeLogs", "error", err)
continue
}
}

l2ScannedBlock.Number = toBlockNumber.Int64()
result := i.db.Where("number >= 0").Updates(l2ScannedBlock)
if result.Error != nil {
log.Error("update l2_scanned_blocks", "error", result.Error)
}

fromBlockNumber = new(big.Int).Add(toBlockNumber, big.NewInt(1))
}
}

func (i *Indexer) getWithdrawalInitiatedLogs(ctx context.Context, fromBlock *big.Int, toBlock *big.Int) ([]types.Log, error) {
i.log.Debug("Fetching logs from blocks", "fromBlock", fromBlock, "toBlock", toBlock)

var (
L2StandardBridgeBotAbi, _ = bindings2.L2StandardBridgeBotMetaData.GetAbi()
L2StandardBridgeBotWithdrawToSig = L2StandardBridgeBotAbi.Events["WithdrawTo"].ID

FeeVaultAbi, _ = bindings.L1FeeVaultMetaData.GetAbi()
FeeVaultWithdrawalSig = FeeVaultAbi.Events["Withdrawal"].ID
)

logs, err := i.l2Client.FilterLogs(context.Background(), ethereum.FilterQuery{
FromBlock: fromBlock,
ToBlock: toBlock,
Addresses: i.contracts,
Topics: [][]common.Hash{[]common.Hash{L2StandardBridgeBotWithdrawToSig, FeeVaultWithdrawalSig}},
})
if err != nil {
return nil, err
}

withdrawalInitiatedLogs := make([]types.Log, 0)
for _, vlog := range logs {
receipt, err := i.l2Client.TransactionReceipt(ctx, vlog.TxHash)
if err != nil {
return nil, err
}

if i.isL2StandardBridgeBotWithdrawToEvent(&vlog) && vlog.Index >= 5 && len(receipt.Logs) >= 6 {
// Events flow:
//
// event[i-5]: WithdrawalInitiated
// event[i-4]: ETHBridgeInitiated
// event[i-3]: MessagePassed
// event[i-2]: SentMessage
// event[i-1]: SentMessageExtension1
// event[i] : L2StandardBridgeBot.WithdrawTo
if i.isL2StandardBridgeWithdrawalInitiatedLog(receipt.Logs[vlog.Index-5]) {
withdrawalInitiatedLogs = append(withdrawalInitiatedLogs, *receipt.Logs[vlog.Index-5])
} else {
i.log.Crit("eth_getLogs returned an unexpected event", "log", vlog)
}
} else if i.isFeeVaultWithdrawEvent(&vlog) && len(receipt.Logs) >= int(vlog.Index)+5 {
// Events flow:
//
// event[i] : FeeVault.Withdrawal
// event[i+1]: WithdrawalInitiated
// event[i+2]: ETHBridgeInitiated
// event[i+3]: MessagePassed
// event[i+4]: SentMessage
// event[i+5]: SentMessageExtension1
if i.isL2StandardBridgeWithdrawalInitiatedLog(receipt.Logs[vlog.Index+1]) {
withdrawalInitiatedLogs = append(withdrawalInitiatedLogs, *receipt.Logs[vlog.Index+5])
} else {
i.log.Crit("eth_getLogs returned an unexpected event", "log", vlog)
}
} else {
i.log.Crit("eth_getLogs returned an unexpected event", "log", vlog)
}
}

return withdrawalInitiatedLogs, nil
}

// storeLogs stores the logs in the database
func (i *Indexer) storeLogs(logs []types.Log) error {
// save all the logs in this range of blocks
for _, vLog := range logs {
header, err := i.l2Client.HeaderByHash(context.Background(), vLog.BlockHash)
if err != nil {
return err
}

deduped := i.db.Clauses(clause.OnConflict{DoNothing: true})
result := deduped.Create(&WithdrawalInitiatedLog{
TransactionHash: vLog.TxHash.Hex(),
LogIndex: int(vLog.Index),
InitiatedBlockNumber: int64(header.Number.Uint64()),
})
if result.Error != nil {
return result.Error
}
}

return nil
}

0 comments on commit 91c81f4

Please sign in to comment.