From f5b9b1cb29e49caef8eb0c62cd09667097144570 Mon Sep 17 00:00:00 2001 From: Alexgao001 Date: Wed, 18 Oct 2023 13:26:30 +0800 Subject: [PATCH] support OPBNB crosschain --- common/const.go | 2 +- config/config.go | 5 ++++ config/config.json | 1 + executor/bsc_executor.go | 12 ++------ executor/const.go | 1 - listener/bsc_listener.go | 49 +++++++++++++++++++++++++++---- listener/greenfield_listener.go | 20 +++++++++++-- vote/bsc_vote_processor.go | 34 +++++++++++++++++++-- vote/greenfield_vote_processor.go | 14 ++++++--- 9 files changed, 112 insertions(+), 26 deletions(-) diff --git a/common/const.go b/common/const.go index 160b270..e6af0c2 100644 --- a/common/const.go +++ b/common/const.go @@ -19,7 +19,7 @@ const ( OracleChannelId types.ChannelId = 0 SleepTimeAfterSyncLightBlock = 15 * time.Second - ListenerPauseTime = 2 * time.Second + ListenerPauseTime = 3 * time.Second ErrorRetryInterval = 1 * time.Second AssembleInterval = 500 * time.Millisecond diff --git a/config/config.go b/config/config.go index a94fbbc..9dfa391 100644 --- a/config/config.go +++ b/config/config.go @@ -70,6 +70,7 @@ func (cfg *GreenfieldConfig) Validate() { } type BSCConfig struct { + OpBNB bool `json:"op_bnb"` KeyType string `json:"key_type"` AWSRegion string `json:"aws_region"` AWSSecretName string `json:"aws_secret_name"` @@ -110,6 +111,10 @@ func (cfg *BSCConfig) Validate() { } } +func (cfg *BSCConfig) IsOpCrossChain() bool { + return cfg.OpBNB +} + type RelayConfig struct { BSCToGreenfieldInturnRelayerTimeout int64 `json:"bsc_to_greenfield_inturn_relayer_timeout"` // in second GreenfieldToBSCInturnRelayerTimeout int64 `json:"greenfield_to_bsc_inturn_relayer_timeout"` // in second diff --git a/config/config.json b/config/config.json index 6c5951c..58e0e3d 100644 --- a/config/config.json +++ b/config/config.json @@ -18,6 +18,7 @@ "use_websocket": true }, "bsc_config": { + "op_bnb": false, "key_type": "local_private_key", "aws_region": "", "aws_secret_name": "", diff --git a/executor/bsc_executor.go b/executor/bsc_executor.go index 0710654..22778b3 100644 --- a/executor/bsc_executor.go +++ b/executor/bsc_executor.go @@ -355,14 +355,10 @@ func (e *BSCExecutor) getTransactor(nonce uint64) (*bind.TransactOpts, error) { txOpts.Nonce = big.NewInt(int64(nonce)) txOpts.Value = big.NewInt(0) txOpts.GasLimit = e.config.BSCConfig.GasLimit - txOpts.GasPrice = e.getGasPrice() + txOpts.GasPrice = e.gasPrice return txOpts, nil } -func (e *BSCExecutor) getGasPrice() *big.Int { - return e.gasPrice -} - func (e *BSCExecutor) SyncTendermintLightBlock(height uint64) (common.Hash, error) { ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout) defer cancel() @@ -461,11 +457,7 @@ func (e *BSCExecutor) QueryCachedLatestValidators() ([]rtypes.Validator, error) if len(e.relayers) != 0 { return e.relayers, nil } - relayers, err := e.QueryLatestValidators() - if err != nil { - return nil, err - } - return relayers, nil + return e.QueryLatestValidators() } func (e *BSCExecutor) UpdateCachedLatestValidatorsLoop() { diff --git a/executor/const.go b/executor/const.go index 7f9632f..a38c25b 100644 --- a/executor/const.go +++ b/executor/const.go @@ -19,5 +19,4 @@ const ( var ( BSCBalanceThreshold = big.NewInt(1000000000000000000) // when relayer is lower than 1BNB, it should try to claim rewards BSCRewardThreshold = big.NewInt(100000000000000000) // if reward is lower than 0.1 BNB, it will not be claimed. - ) diff --git a/listener/bsc_listener.go b/listener/bsc_listener.go index ef9b7ca..c07ffde 100644 --- a/listener/bsc_listener.go +++ b/listener/bsc_listener.go @@ -68,7 +68,13 @@ func (l *BSCListener) poll() error { if nextHeight <= latestPolledBlockHeight { nextHeight = latestPolledBlockHeight + 1 } - latestBlockHeight, err := l.bscExecutor.GetLatestFinalizedBlockHeightWithRetry() + var latestBlockHeight uint64 + if l.isOpCrossChain() { + // currently Get finalized block is not support by OPBNB yet + latestBlockHeight, err = l.bscExecutor.GetLatestBlockHeightWithRetry() + } else { + latestBlockHeight, err = l.bscExecutor.GetLatestFinalizedBlockHeightWithRetry() + } if err != nil { logging.Logger.Errorf("failed to get latest finalized blockHeight, error: %s", err.Error()) return err @@ -78,7 +84,7 @@ func (l *BSCListener) poll() error { return nil } } - if err = l.monitorCrossChainPkgAt(nextHeight); err != nil { + if err = l.monitorCrossChainPkgAt(nextHeight, latestPolledBlock); err != nil { return err } return nil @@ -88,7 +94,7 @@ func (l *BSCListener) getLatestPolledBlock() (*model.BscBlock, error) { return l.DaoManager.BSCDao.GetLatestBlock() } -func (l *BSCListener) monitorCrossChainPkgAt(nextHeight uint64) error { +func (l *BSCListener) monitorCrossChainPkgAt(nextHeight uint64, latestPolledBlock *model.BscBlock) error { nextHeightBlockHeader, err := l.bscExecutor.GetBlockHeaderAtHeight(nextHeight) if err != nil { return fmt.Errorf("failed to get latest block header, error: %s", err.Error()) @@ -98,6 +104,18 @@ func (l *BSCListener) monitorCrossChainPkgAt(nextHeight uint64) error { return nil } logging.Logger.Infof("retrieved BSC block header at height=%d", nextHeight) + + if l.config.BSCConfig.IsOpCrossChain() { + // check if the latest polled block in DB is forked, if so, delete it. + isForked, err := l.isForkedBlockAndDelete(latestPolledBlock, nextHeight, nextHeightBlockHeader.ParentHash) + if err != nil { + return err + } + if isForked { + return fmt.Errorf("there is fork at block height=%d", latestPolledBlock.Height) + } + } + logs, err := l.queryCrossChainLogs(nextHeightBlockHeader.Hash()) if err != nil { return fmt.Errorf("failed to get logs from block at height=%d, err=%s", nextHeight, err.Error()) @@ -114,7 +132,6 @@ func (l *BSCListener) monitorCrossChainPkgAt(nextHeight uint64) error { logging.Logger.Errorf("failed to parse event log, txHash=%s, err=%s", log.TxHash, err.Error()) continue } - if relayPkg == nil { continue } @@ -148,6 +165,18 @@ func (l *BSCListener) queryCrossChainLogs(blockHash ethcommon.Hash) ([]types.Log return logs, nil } +func (l *BSCListener) isForkedBlockAndDelete(latestPolledBlock *model.BscBlock, nextHeight uint64, parentHash ethcommon.Hash) (bool, error) { + if latestPolledBlock.Height != 0 && latestPolledBlock.Height+1 == nextHeight && parentHash.String() != latestPolledBlock.BlockHash { + // delete latestPolledBlock and its cross-chain packages from DB + if err := l.DaoManager.BSCDao.DeleteBlockAndPackagesAtHeight(latestPolledBlock.Height); err != nil { + return true, err + } + logging.Logger.Infof("deleted block at height=%d from DB due to there is a fork", latestPolledBlock.Height) + return true, nil + } + return false, nil +} + func (l *BSCListener) getCrossChainPackageEventHash() ethcommon.Hash { return ethcommon.HexToHash(CrossChainPackageEventHex) } @@ -180,8 +209,18 @@ func (l *BSCListener) PurgeLoop() { logging.Logger.Errorf("failed to delete bsc packages, err=%s", err.Error()) continue } - if err = l.DaoManager.VoteDao.DeleteVotesBelowHeightWithLimit(blockHeightThreshHold, uint32(votepool.FromBscCrossChainEvent), DeletionLimit); err != nil { + var eventType votepool.EventType + if l.isOpCrossChain() { + eventType = votepool.FromOpCrossChainEvent + } else { + eventType = votepool.FromBscCrossChainEvent + } + if err = l.DaoManager.VoteDao.DeleteVotesBelowHeightWithLimit(blockHeightThreshHold, uint32(eventType), DeletionLimit); err != nil { logging.Logger.Errorf("failed to delete votes, err=%s", err.Error()) } } } + +func (l *BSCListener) isOpCrossChain() bool { + return l.config.BSCConfig.IsOpCrossChain() +} diff --git a/listener/greenfield_listener.go b/listener/greenfield_listener.go index f49b5c4..c75d20b 100644 --- a/listener/greenfield_listener.go +++ b/listener/greenfield_listener.go @@ -122,6 +122,9 @@ func (l *GreenfieldListener) monitorTxEvents(block *tmtypes.Block, txRes []*abci errChan <- err return } + if relayTx.DestChainId != l.destChainId() { + break + } relayTx.TxHash = hex.EncodeToString(block.Txs[idx].Hash()) txChan <- relayTx } @@ -138,6 +141,9 @@ func (l *GreenfieldListener) monitorEndBlockEvents(height uint64, endBlockEvents errChan <- err return } + if relayTx.DestChainId != l.destChainId() { + break + } txChan <- relayTx } } @@ -222,7 +228,7 @@ func (l *GreenfieldListener) calNextHeight() (uint64, error) { return 0, fmt.Errorf("failed to get latest block height, error: %s", err.Error()) } // pauses relayer for a bit since it already caught the newest block - if int64(nextHeight) == int64(latestBlockHeight) { + if int64(nextHeight) >= int64(latestBlockHeight) { time.Sleep(common.ListenerPauseTime) return nextHeight, nil } @@ -340,8 +346,18 @@ func (l *GreenfieldListener) PurgeLoop() { logging.Logger.Errorf("failed to delete gnfd transactions, err=%s", err.Error()) continue } - if err = l.DaoManager.VoteDao.DeleteVotesBelowHeightWithLimit(threshHold, uint32(votepool.ToBscCrossChainEvent), DeletionLimit); err != nil { + var eventType votepool.EventType + if l.config.BSCConfig.IsOpCrossChain() { + eventType = votepool.ToOpCrossChainEvent + } else { + eventType = votepool.ToBscCrossChainEvent + } + if err = l.DaoManager.VoteDao.DeleteVotesBelowHeightWithLimit(threshHold, uint32(eventType), DeletionLimit); err != nil { logging.Logger.Errorf("failed to delete votes, err=%s", err.Error()) } } } + +func (l *GreenfieldListener) destChainId() uint32 { + return uint32(l.config.BSCConfig.ChainId) +} diff --git a/vote/bsc_vote_processor.go b/vote/bsc_vote_processor.go index 263ff33..58d673e 100644 --- a/vote/bsc_vote_processor.go +++ b/vote/bsc_vote_processor.go @@ -33,15 +33,23 @@ type BSCVoteProcessor struct { signer *VoteSigner bscExecutor *executor.BSCExecutor blsPublicKey []byte + eventType votepool.EventType } func NewBSCVoteProcessor(cfg *config.Config, dao *dao.DaoManager, signer *VoteSigner, bscExecutor *executor.BSCExecutor) *BSCVoteProcessor { + var eventType votepool.EventType + if cfg.BSCConfig.IsOpCrossChain() { + eventType = votepool.FromOpCrossChainEvent + } else { + eventType = votepool.FromBscCrossChainEvent + } return &BSCVoteProcessor{ config: cfg, daoManager: dao, signer: signer, bscExecutor: bscExecutor, blsPublicKey: bscExecutor.GreenfieldExecutor.BlsPubKey, + eventType: eventType, } } @@ -56,12 +64,28 @@ func (p *BSCVoteProcessor) SignAndBroadcastVoteLoop() { // SignAndBroadcastVoteLoop signs using the bls private key, and broadcast the vote to votepool func (p *BSCVoteProcessor) signAndBroadcast() error { + var ( + latestHeight uint64 + err error + ) + if p.isOpCrossChain() { + latestHeight, err = p.bscExecutor.GetLatestBlockHeightWithRetry() + if err != nil { + logging.Logger.Errorf("failed to get latest block height, error: %s", err.Error()) + return err + } + } // need to keep track of the height so that make sure that we aggregate packages are from only 1 block. leastSavedPkgHeight, err := p.daoManager.BSCDao.GetLeastSavedPackagesHeight() if err != nil { return fmt.Errorf("failed to get least saved packages' height, error: %s", err.Error()) - } + if p.isOpCrossChain() { + if leastSavedPkgHeight+p.config.BSCConfig.NumberOfBlocksForFinality > latestHeight { + return nil + } + } + pkgs, err := p.daoManager.BSCDao.GetPackagesByHeightAndStatus(db.Saved, leastSavedPkgHeight) if err != nil { return fmt.Errorf("failed to get packages at height %d from db, error: %s", leastSavedPkgHeight, err.Error()) @@ -271,7 +295,7 @@ func (p *BSCVoteProcessor) queryMoreThanTwoThirdValidVotes(localVote *model.Vote if triedTimes > QueryVotepoolMaxRetryTimes { return fmt.Errorf("exceed max retry=%d", QueryVotepoolMaxRetryTimes) } - queriedVotes, err := p.bscExecutor.GreenfieldExecutor.QueryVotesByEventHashAndType(localVote.EventHash, votepool.FromBscCrossChainEvent) + queriedVotes, err := p.bscExecutor.GreenfieldExecutor.QueryVotesByEventHashAndType(localVote.EventHash, p.eventType) if err != nil { return fmt.Errorf("failed to query votes. eventHash=%s", hex.EncodeToString(localVote.EventHash)) } @@ -333,7 +357,7 @@ func (p *BSCVoteProcessor) queryMoreThanTwoThirdValidVotes(localVote *model.Vote func (p *BSCVoteProcessor) constructSignedVote(eventHash []byte) *votepool.Vote { var v votepool.Vote - v.EventType = votepool.FromBscCrossChainEvent + v.EventType = p.eventType v.EventHash = eventHash p.signer.SignVote(&v) return &v @@ -367,3 +391,7 @@ func (p *BSCVoteProcessor) reBroadcastVote(localVote *model.Vote) error { func (p *BSCVoteProcessor) getChainId() sdk.ChainID { return sdk.ChainID(p.config.BSCConfig.ChainId) } + +func (p *BSCVoteProcessor) isOpCrossChain() bool { + return p.config.BSCConfig.IsOpCrossChain() +} diff --git a/vote/greenfield_vote_processor.go b/vote/greenfield_vote_processor.go index 9672728..02ac7c0 100644 --- a/vote/greenfield_vote_processor.go +++ b/vote/greenfield_vote_processor.go @@ -34,16 +34,24 @@ type GreenfieldVoteProcessor struct { signer *VoteSigner greenfieldExecutor *executor.GreenfieldExecutor blsPublicKey []byte + eventType votepool.EventType } func NewGreenfieldVoteProcessor(cfg *config.Config, dao *dao.DaoManager, signer *VoteSigner, greenfieldExecutor *executor.GreenfieldExecutor) *GreenfieldVoteProcessor { + var eventType votepool.EventType + if cfg.BSCConfig.IsOpCrossChain() { + eventType = votepool.ToOpCrossChainEvent + } else { + eventType = votepool.ToBscCrossChainEvent + } return &GreenfieldVoteProcessor{ config: cfg, daoManager: dao, signer: signer, greenfieldExecutor: greenfieldExecutor, blsPublicKey: greenfieldExecutor.BlsPubKey, + eventType: eventType, } } @@ -219,14 +227,12 @@ func (p *GreenfieldVoteProcessor) queryMoreThanTwoThirdVotesForTx(localVote *mod channelId := localVote.ChannelId seq := localVote.Sequence ticker := time.NewTicker(VotePoolQueryRetryInterval) - for range ticker.C { triedTimes++ if triedTimes > QueryVotepoolMaxRetryTimes { return fmt.Errorf("exceed max retry=%d", QueryVotepoolMaxRetryTimes) } - - queriedVotes, err := p.greenfieldExecutor.QueryVotesByEventHashAndType(localVote.EventHash, votepool.ToBscCrossChainEvent) + queriedVotes, err := p.greenfieldExecutor.QueryVotesByEventHashAndType(localVote.EventHash, p.eventType) if err != nil { return fmt.Errorf("failed to query votes. eventHash=%s", hex.EncodeToString(localVote.EventHash)) } @@ -291,7 +297,7 @@ func (p *GreenfieldVoteProcessor) queryMoreThanTwoThirdVotesForTx(localVote *mod func (p *GreenfieldVoteProcessor) constructVoteAndSign(aggregatedPayload []byte) *votepool.Vote { var v votepool.Vote - v.EventType = votepool.ToBscCrossChainEvent + v.EventType = p.eventType v.EventHash = p.getEventHash(aggregatedPayload) p.signer.SignVote(&v) return &v