From 91c81f4c348cc65f8631e100e0cc507bb3e17b87 Mon Sep 17 00:00:00 2001 From: bendanzhentan <455462586@qq.com> Date: Wed, 13 Dec 2023 10:02:48 +0800 Subject: [PATCH] refactor: split into indexer.go --- cmd/bot/run.go | 106 +--------------------- core/indexer.go | 233 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 237 insertions(+), 102 deletions(-) create mode 100644 core/indexer.go diff --git a/cmd/bot/run.go b/cmd/bot/run.go index 84effb4..7e3cb6f 100644 --- a/cmd/bot/run.go +++ b/cmd/bot/run.go @@ -5,21 +5,17 @@ import ( "context" "errors" "fmt" - "math/big" "strings" "time" "github.com/ethereum-optimism/optimism/indexer/config" oplog "github.com/ethereum-optimism/optimism/op-service/log" "github.com/ethereum-optimism/optimism/op-service/retry" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/urfave/cli/v2" "gorm.io/driver/mysql" "gorm.io/gorm" - "gorm.io/gorm/clause" ) func RunCommand(ctx *cli.Context) error { @@ -64,7 +60,10 @@ func RunCommand(ctx *cli.Context) error { return err } - go WatchBotDelegatedWithdrawals(ctx.Context, logger, db, l2Client, l2ScannedBlock, cfg) + go func() { + indexer := core.NewIndexer(logger, db, l2Client, cfg) + indexer.Start(ctx.Context, l2ScannedBlock) + }() go ProcessBotDelegatedWithdrawals(ctx.Context, logger, db, l1Client, l2Client, cfg) <-ctx.Context.Done() @@ -214,103 +213,6 @@ func ProcessUnfinalizedBotDelegatedWithdrawals(ctx context.Context, log log.Logg } } -// storeLogs stores the logs in the database -func storeLogs(db *gorm.DB, client *core.ClientExt, logs []types.Log) error { - // save all the logs in this range of blocks - for _, vLog := range logs { - header, err := client.HeaderByHash(context.Background(), vLog.BlockHash) - if err != nil { - return err - } - - event := core.WithdrawalInitiatedLog{ - TransactionHash: vLog.TxHash.Hex(), - LogIndex: int(vLog.Index), - InitiatedBlockNumber: int64(header.Number.Uint64()), - } - - deduped := db.Clauses( - clause.OnConflict{DoNothing: true}, - ) - result := deduped.Create(&event) - if result.Error != nil { - return result.Error - } - } - - return nil -} - -// WatchBotDelegatedWithdrawals watches for new bot-delegated withdrawals and stores them in the database. -func WatchBotDelegatedWithdrawals(ctx context.Context, log log.Logger, db *gorm.DB, client *core.ClientExt, l2StartingBlock *core.L2ScannedBlock, cfg core.Config) { - timer := time.NewTimer(0) - fromBlockNumber := big.NewInt(l2StartingBlock.Number) - - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - timer.Reset(time.Second) - } - - toBlockNumber := new(big.Int).Add(fromBlockNumber, big.NewInt(cfg.L2StandardBridgeBot.LogFilterBlockRange)) - finalizedHeader, err := client.GetHeaderByTag(context.Background(), "finalized") - if err != nil { - log.Error("call eth_blockNumber", "error", err) - continue - } - if toBlockNumber.Uint64() > finalizedHeader.Number.Uint64() { - toBlockNumber = finalizedHeader.Number - } - - if fromBlockNumber.Uint64() > toBlockNumber.Uint64() { - timer.Reset(5 * time.Second) - continue - } - - log.Info("Fetching logs from blocks", "fromBlock", fromBlockNumber, "toBlock", toBlockNumber) - logs, err := getLogs(client, fromBlockNumber, toBlockNumber, common.HexToAddress(cfg.L2StandardBridgeBot.ContractAddress), core.WithdrawToEventSig()) - if err != nil { - log.Error("eth_getLogs", "error", err) - continue - } - - if len(logs) != 0 { - for _, vlog := range logs { - log.Info("fetched bot-delegated withdrawal", "blockNumber", vlog.BlockNumber, "transactionHash", vlog.TxHash.Hex()) - } - - err = storeLogs(db, client, logs) - if err != nil { - log.Error("storeLogs", "error", err) - continue - } - } - - l2StartingBlock.Number = toBlockNumber.Int64() - result := db.Save(l2StartingBlock) - if result.Error != nil { - log.Error("update l2_scanned_blocks", "error", result.Error) - } - - fromBlockNumber = new(big.Int).Add(toBlockNumber, big.NewInt(1)) - } -} - -// getLogs returns the logs for a given contract address and block range -func getLogs(client *core.ClientExt, fromBlock *big.Int, toBlock *big.Int, contractAddress common.Address, eventSig common.Hash) ([]types.Log, error) { - query := ethereum.FilterQuery{ - FromBlock: fromBlock, - ToBlock: toBlock, - Addresses: []common.Address{ - contractAddress, - }, - Topics: [][]common.Hash{[]common.Hash{eventSig}}, - } - return client.FilterLogs(context.Background(), query) -} - // connect connects to the database func connect(log log.Logger, dbConfig config.DBConfig) (*gorm.DB, error) { dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8&parseTime=True&loc=Local", dbConfig.User, dbConfig.Password, dbConfig.Host, dbConfig.Port, dbConfig.Name) diff --git a/core/indexer.go b/core/indexer.go new file mode 100644 index 0000000..38ac868 --- /dev/null +++ b/core/indexer.go @@ -0,0 +1,233 @@ +package core + +import ( + bindings2 "bnbchain/opbnb-bridge-bot/bindings" + "context" + "math/big" + "time" + + "github.com/ethereum-optimism/optimism/op-bindings/bindings" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type Indexer struct { + log log.Logger + db *gorm.DB + l2Client *ClientExt + cfg Config + + contracts []common.Address + + // Note: feeVaults indicates the configured list of FeeVault contracts: + // - [SequencerFeeVault.sol](https://github.com/bnb-chain/opbnb/blob/develop/packages/contracts-bedrock/contracts/L2/SequencerFeeVault.sol) + // - [BaseFeeVault.sol](https://github.com/bnb-chain/opbnb/blob/develop/packages/contracts-bedrock/contracts/L2/BaseFeeVault.sol) + // - [L1FeeVault.sol](https://github.com/bnb-chain/opbnb/blob/develop/packages/contracts-bedrock/contracts/L2/L1FeeVault.sol) + isFeeVaultWithdrawEvent func(vlog *types.Log) bool + isL2StandardBridgeWithdrawalInitiatedLog func(vlog *types.Log) bool + isL2StandardBridgeBotWithdrawToEvent func(vlog *types.Log) bool +} + +func NewIndexer(log log.Logger, db *gorm.DB, l2Client *ClientExt, cfg Config) *Indexer { + l2StandardBridgeBots := make(map[common.Address]struct{}) + feeVaults := make(map[common.Address]struct{}) + contracts := make([]common.Address, 0) + for _, addr := range cfg.L2StandardBridgeBot.ContractAddresses { + addr_ := common.HexToAddress(addr) + l2StandardBridgeBots[addr_] = struct{}{} + contracts = append(contracts, addr_) + } + for _, addr := range cfg.FeeVault.ContractAddresses { + addr_ := common.HexToAddress(addr) + feeVaults[addr_] = struct{}{} + contracts = append(contracts, addr_) + } + + isL2StandardBridgeWithdrawalInitiatedLog := func(vlog *types.Log) bool { + var ( + L2StandardBridgeAbi, _ = bindings.L2StandardBridgeMetaData.GetAbi() + L2StandardBridgeWithdrawalInitiatedSig = L2StandardBridgeAbi.Events["WithdrawalInitiated"].ID + ) + return len(vlog.Topics) > 1 && vlog.Topics[0] == L2StandardBridgeWithdrawalInitiatedSig + } + isL2StandardBridgeBotWithdrawToEvent := func(log *types.Log) bool { + var ( + L2StandardBridgeBotAbi, _ = bindings2.L2StandardBridgeBotMetaData.GetAbi() + L2StandardBridgeBotWithdrawToSig = L2StandardBridgeBotAbi.Events["WithdrawTo"].ID + ) + if len(log.Topics) > 0 && log.Topics[0] == L2StandardBridgeBotWithdrawToSig { + _, ok := l2StandardBridgeBots[log.Address] + return ok + } + return false + } + isFeeVaultWithdrawEvent := func(log *types.Log) bool { + var ( + FeeVaultAbi, _ = bindings.L1FeeVaultMetaData.GetAbi() + FeeVaultWithdrawalSig = FeeVaultAbi.Events["Withdrawal"].ID + ) + if len(log.Topics) > 0 && log.Topics[0] == FeeVaultWithdrawalSig { + _, ok := feeVaults[log.Address] + return ok + } + return false + } + + return &Indexer{ + log: log, + db: db, + l2Client: l2Client, + cfg: cfg, + isL2StandardBridgeWithdrawalInitiatedLog: isL2StandardBridgeWithdrawalInitiatedLog, + isL2StandardBridgeBotWithdrawToEvent: isL2StandardBridgeBotWithdrawToEvent, + isFeeVaultWithdrawEvent: isFeeVaultWithdrawEvent, + contracts: contracts, + } +} + +// Start watches for new bot-delegated withdrawals and stores them in the database. +func (i *Indexer) Start(ctx context.Context, l2ScannedBlock *L2ScannedBlock) { + timer := time.NewTimer(0) + fromBlockNumber := big.NewInt(l2ScannedBlock.Number) + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + timer.Reset(time.Second) + } + + toBlockNumber := new(big.Int).Add(fromBlockNumber, big.NewInt(i.cfg.L2StandardBridgeBot.LogFilterBlockRange)) + finalizedHeader, err := i.l2Client.GetHeaderByTag(context.Background(), "finalized") + if err != nil { + log.Error("call eth_blockNumber", "error", err) + continue + } + if toBlockNumber.Uint64() > finalizedHeader.Number.Uint64() { + toBlockNumber = finalizedHeader.Number + } + + if fromBlockNumber.Uint64() > toBlockNumber.Uint64() { + timer.Reset(5 * time.Second) + continue + } + + logs, err := i.getWithdrawalInitiatedLogs(ctx, fromBlockNumber, toBlockNumber) + if err != nil { + log.Error("eth_getLogs", "error", err) + continue + } + + if len(logs) != 0 { + for _, vlog := range logs { + log.Info("fetched bot-delegated withdrawal", "blockNumber", vlog.BlockNumber, "transactionHash", vlog.TxHash.Hex()) + } + + err = i.storeLogs(logs) + if err != nil { + log.Error("storeLogs", "error", err) + continue + } + } + + l2ScannedBlock.Number = toBlockNumber.Int64() + result := i.db.Where("number >= 0").Updates(l2ScannedBlock) + if result.Error != nil { + log.Error("update l2_scanned_blocks", "error", result.Error) + } + + fromBlockNumber = new(big.Int).Add(toBlockNumber, big.NewInt(1)) + } +} + +func (i *Indexer) getWithdrawalInitiatedLogs(ctx context.Context, fromBlock *big.Int, toBlock *big.Int) ([]types.Log, error) { + i.log.Debug("Fetching logs from blocks", "fromBlock", fromBlock, "toBlock", toBlock) + + var ( + L2StandardBridgeBotAbi, _ = bindings2.L2StandardBridgeBotMetaData.GetAbi() + L2StandardBridgeBotWithdrawToSig = L2StandardBridgeBotAbi.Events["WithdrawTo"].ID + + FeeVaultAbi, _ = bindings.L1FeeVaultMetaData.GetAbi() + FeeVaultWithdrawalSig = FeeVaultAbi.Events["Withdrawal"].ID + ) + + logs, err := i.l2Client.FilterLogs(context.Background(), ethereum.FilterQuery{ + FromBlock: fromBlock, + ToBlock: toBlock, + Addresses: i.contracts, + Topics: [][]common.Hash{[]common.Hash{L2StandardBridgeBotWithdrawToSig, FeeVaultWithdrawalSig}}, + }) + if err != nil { + return nil, err + } + + withdrawalInitiatedLogs := make([]types.Log, 0) + for _, vlog := range logs { + receipt, err := i.l2Client.TransactionReceipt(ctx, vlog.TxHash) + if err != nil { + return nil, err + } + + if i.isL2StandardBridgeBotWithdrawToEvent(&vlog) && vlog.Index >= 5 && len(receipt.Logs) >= 6 { + // Events flow: + // + // event[i-5]: WithdrawalInitiated + // event[i-4]: ETHBridgeInitiated + // event[i-3]: MessagePassed + // event[i-2]: SentMessage + // event[i-1]: SentMessageExtension1 + // event[i] : L2StandardBridgeBot.WithdrawTo + if i.isL2StandardBridgeWithdrawalInitiatedLog(receipt.Logs[vlog.Index-5]) { + withdrawalInitiatedLogs = append(withdrawalInitiatedLogs, *receipt.Logs[vlog.Index-5]) + } else { + i.log.Crit("eth_getLogs returned an unexpected event", "log", vlog) + } + } else if i.isFeeVaultWithdrawEvent(&vlog) && len(receipt.Logs) >= int(vlog.Index)+5 { + // Events flow: + // + // event[i] : FeeVault.Withdrawal + // event[i+1]: WithdrawalInitiated + // event[i+2]: ETHBridgeInitiated + // event[i+3]: MessagePassed + // event[i+4]: SentMessage + // event[i+5]: SentMessageExtension1 + if i.isL2StandardBridgeWithdrawalInitiatedLog(receipt.Logs[vlog.Index+1]) { + withdrawalInitiatedLogs = append(withdrawalInitiatedLogs, *receipt.Logs[vlog.Index+5]) + } else { + i.log.Crit("eth_getLogs returned an unexpected event", "log", vlog) + } + } else { + i.log.Crit("eth_getLogs returned an unexpected event", "log", vlog) + } + } + + return withdrawalInitiatedLogs, nil +} + +// storeLogs stores the logs in the database +func (i *Indexer) storeLogs(logs []types.Log) error { + // save all the logs in this range of blocks + for _, vLog := range logs { + header, err := i.l2Client.HeaderByHash(context.Background(), vLog.BlockHash) + if err != nil { + return err + } + + deduped := i.db.Clauses(clause.OnConflict{DoNothing: true}) + result := deduped.Create(&WithdrawalInitiatedLog{ + TransactionHash: vLog.TxHash.Hex(), + LogIndex: int(vLog.Index), + InitiatedBlockNumber: int64(header.Number.Uint64()), + }) + if result.Error != nil { + return result.Error + } + } + + return nil +}