Skip to content

Commit

Permalink
support OPBNB crosschain
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 committed Oct 18, 2023
1 parent 6afcf08 commit f5b9b1c
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 26 deletions.
2 changes: 1 addition & 1 deletion common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
OracleChannelId types.ChannelId = 0
SleepTimeAfterSyncLightBlock = 15 * time.Second

ListenerPauseTime = 2 * time.Second
ListenerPauseTime = 3 * time.Second
ErrorRetryInterval = 1 * time.Second
AssembleInterval = 500 * time.Millisecond

Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (cfg *GreenfieldConfig) Validate() {
}

type BSCConfig struct {
OpBNB bool `json:"op_bnb"`
KeyType string `json:"key_type"`
AWSRegion string `json:"aws_region"`
AWSSecretName string `json:"aws_secret_name"`
Expand Down Expand Up @@ -110,6 +111,10 @@ func (cfg *BSCConfig) Validate() {
}
}

func (cfg *BSCConfig) IsOpCrossChain() bool {
return cfg.OpBNB
}

type RelayConfig struct {
BSCToGreenfieldInturnRelayerTimeout int64 `json:"bsc_to_greenfield_inturn_relayer_timeout"` // in second
GreenfieldToBSCInturnRelayerTimeout int64 `json:"greenfield_to_bsc_inturn_relayer_timeout"` // in second
Expand Down
1 change: 1 addition & 0 deletions config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"use_websocket": true
},
"bsc_config": {
"op_bnb": false,
"key_type": "local_private_key",
"aws_region": "",
"aws_secret_name": "",
Expand Down
12 changes: 2 additions & 10 deletions executor/bsc_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,10 @@ func (e *BSCExecutor) getTransactor(nonce uint64) (*bind.TransactOpts, error) {
txOpts.Nonce = big.NewInt(int64(nonce))
txOpts.Value = big.NewInt(0)
txOpts.GasLimit = e.config.BSCConfig.GasLimit
txOpts.GasPrice = e.getGasPrice()
txOpts.GasPrice = e.gasPrice
return txOpts, nil
}

func (e *BSCExecutor) getGasPrice() *big.Int {
return e.gasPrice
}

func (e *BSCExecutor) SyncTendermintLightBlock(height uint64) (common.Hash, error) {
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
Expand Down Expand Up @@ -461,11 +457,7 @@ func (e *BSCExecutor) QueryCachedLatestValidators() ([]rtypes.Validator, error)
if len(e.relayers) != 0 {
return e.relayers, nil
}
relayers, err := e.QueryLatestValidators()
if err != nil {
return nil, err
}
return relayers, nil
return e.QueryLatestValidators()
}

func (e *BSCExecutor) UpdateCachedLatestValidatorsLoop() {
Expand Down
1 change: 0 additions & 1 deletion executor/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@ const (
var (
BSCBalanceThreshold = big.NewInt(1000000000000000000) // when relayer is lower than 1BNB, it should try to claim rewards
BSCRewardThreshold = big.NewInt(100000000000000000) // if reward is lower than 0.1 BNB, it will not be claimed.

)
49 changes: 44 additions & 5 deletions listener/bsc_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ func (l *BSCListener) poll() error {
if nextHeight <= latestPolledBlockHeight {
nextHeight = latestPolledBlockHeight + 1
}
latestBlockHeight, err := l.bscExecutor.GetLatestFinalizedBlockHeightWithRetry()
var latestBlockHeight uint64
if l.isOpCrossChain() {
// currently Get finalized block is not support by OPBNB yet
latestBlockHeight, err = l.bscExecutor.GetLatestBlockHeightWithRetry()
} else {
latestBlockHeight, err = l.bscExecutor.GetLatestFinalizedBlockHeightWithRetry()
}
if err != nil {
logging.Logger.Errorf("failed to get latest finalized blockHeight, error: %s", err.Error())
return err
Expand All @@ -78,7 +84,7 @@ func (l *BSCListener) poll() error {
return nil
}
}
if err = l.monitorCrossChainPkgAt(nextHeight); err != nil {
if err = l.monitorCrossChainPkgAt(nextHeight, latestPolledBlock); err != nil {
return err
}
return nil
Expand All @@ -88,7 +94,7 @@ func (l *BSCListener) getLatestPolledBlock() (*model.BscBlock, error) {
return l.DaoManager.BSCDao.GetLatestBlock()
}

func (l *BSCListener) monitorCrossChainPkgAt(nextHeight uint64) error {
func (l *BSCListener) monitorCrossChainPkgAt(nextHeight uint64, latestPolledBlock *model.BscBlock) error {
nextHeightBlockHeader, err := l.bscExecutor.GetBlockHeaderAtHeight(nextHeight)
if err != nil {
return fmt.Errorf("failed to get latest block header, error: %s", err.Error())
Expand All @@ -98,6 +104,18 @@ func (l *BSCListener) monitorCrossChainPkgAt(nextHeight uint64) error {
return nil
}
logging.Logger.Infof("retrieved BSC block header at height=%d", nextHeight)

if l.config.BSCConfig.IsOpCrossChain() {
// check if the latest polled block in DB is forked, if so, delete it.
isForked, err := l.isForkedBlockAndDelete(latestPolledBlock, nextHeight, nextHeightBlockHeader.ParentHash)
if err != nil {
return err
}
if isForked {
return fmt.Errorf("there is fork at block height=%d", latestPolledBlock.Height)
}
}

logs, err := l.queryCrossChainLogs(nextHeightBlockHeader.Hash())
if err != nil {
return fmt.Errorf("failed to get logs from block at height=%d, err=%s", nextHeight, err.Error())
Expand All @@ -114,7 +132,6 @@ func (l *BSCListener) monitorCrossChainPkgAt(nextHeight uint64) error {
logging.Logger.Errorf("failed to parse event log, txHash=%s, err=%s", log.TxHash, err.Error())
continue
}

if relayPkg == nil {
continue
}
Expand Down Expand Up @@ -148,6 +165,18 @@ func (l *BSCListener) queryCrossChainLogs(blockHash ethcommon.Hash) ([]types.Log
return logs, nil
}

func (l *BSCListener) isForkedBlockAndDelete(latestPolledBlock *model.BscBlock, nextHeight uint64, parentHash ethcommon.Hash) (bool, error) {
if latestPolledBlock.Height != 0 && latestPolledBlock.Height+1 == nextHeight && parentHash.String() != latestPolledBlock.BlockHash {
// delete latestPolledBlock and its cross-chain packages from DB
if err := l.DaoManager.BSCDao.DeleteBlockAndPackagesAtHeight(latestPolledBlock.Height); err != nil {
return true, err
}
logging.Logger.Infof("deleted block at height=%d from DB due to there is a fork", latestPolledBlock.Height)
return true, nil
}
return false, nil
}

func (l *BSCListener) getCrossChainPackageEventHash() ethcommon.Hash {
return ethcommon.HexToHash(CrossChainPackageEventHex)
}
Expand Down Expand Up @@ -180,8 +209,18 @@ func (l *BSCListener) PurgeLoop() {
logging.Logger.Errorf("failed to delete bsc packages, err=%s", err.Error())
continue
}
if err = l.DaoManager.VoteDao.DeleteVotesBelowHeightWithLimit(blockHeightThreshHold, uint32(votepool.FromBscCrossChainEvent), DeletionLimit); err != nil {
var eventType votepool.EventType
if l.isOpCrossChain() {
eventType = votepool.FromOpCrossChainEvent
} else {
eventType = votepool.FromBscCrossChainEvent
}
if err = l.DaoManager.VoteDao.DeleteVotesBelowHeightWithLimit(blockHeightThreshHold, uint32(eventType), DeletionLimit); err != nil {
logging.Logger.Errorf("failed to delete votes, err=%s", err.Error())
}
}
}

func (l *BSCListener) isOpCrossChain() bool {
return l.config.BSCConfig.IsOpCrossChain()
}
20 changes: 18 additions & 2 deletions listener/greenfield_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func (l *GreenfieldListener) monitorTxEvents(block *tmtypes.Block, txRes []*abci
errChan <- err
return
}
if relayTx.DestChainId != l.destChainId() {
break
}
relayTx.TxHash = hex.EncodeToString(block.Txs[idx].Hash())
txChan <- relayTx
}
Expand All @@ -138,6 +141,9 @@ func (l *GreenfieldListener) monitorEndBlockEvents(height uint64, endBlockEvents
errChan <- err
return
}
if relayTx.DestChainId != l.destChainId() {
break
}
txChan <- relayTx
}
}
Expand Down Expand Up @@ -222,7 +228,7 @@ func (l *GreenfieldListener) calNextHeight() (uint64, error) {
return 0, fmt.Errorf("failed to get latest block height, error: %s", err.Error())
}
// pauses relayer for a bit since it already caught the newest block
if int64(nextHeight) == int64(latestBlockHeight) {
if int64(nextHeight) >= int64(latestBlockHeight) {
time.Sleep(common.ListenerPauseTime)
return nextHeight, nil
}
Expand Down Expand Up @@ -340,8 +346,18 @@ func (l *GreenfieldListener) PurgeLoop() {
logging.Logger.Errorf("failed to delete gnfd transactions, err=%s", err.Error())
continue
}
if err = l.DaoManager.VoteDao.DeleteVotesBelowHeightWithLimit(threshHold, uint32(votepool.ToBscCrossChainEvent), DeletionLimit); err != nil {
var eventType votepool.EventType
if l.config.BSCConfig.IsOpCrossChain() {
eventType = votepool.ToOpCrossChainEvent
} else {
eventType = votepool.ToBscCrossChainEvent
}
if err = l.DaoManager.VoteDao.DeleteVotesBelowHeightWithLimit(threshHold, uint32(eventType), DeletionLimit); err != nil {
logging.Logger.Errorf("failed to delete votes, err=%s", err.Error())
}
}
}

func (l *GreenfieldListener) destChainId() uint32 {
return uint32(l.config.BSCConfig.ChainId)
}
34 changes: 31 additions & 3 deletions vote/bsc_vote_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,23 @@ type BSCVoteProcessor struct {
signer *VoteSigner
bscExecutor *executor.BSCExecutor
blsPublicKey []byte
eventType votepool.EventType
}

func NewBSCVoteProcessor(cfg *config.Config, dao *dao.DaoManager, signer *VoteSigner, bscExecutor *executor.BSCExecutor) *BSCVoteProcessor {
var eventType votepool.EventType
if cfg.BSCConfig.IsOpCrossChain() {
eventType = votepool.FromOpCrossChainEvent
} else {
eventType = votepool.FromBscCrossChainEvent
}
return &BSCVoteProcessor{
config: cfg,
daoManager: dao,
signer: signer,
bscExecutor: bscExecutor,
blsPublicKey: bscExecutor.GreenfieldExecutor.BlsPubKey,
eventType: eventType,
}
}

Expand All @@ -56,12 +64,28 @@ func (p *BSCVoteProcessor) SignAndBroadcastVoteLoop() {

// SignAndBroadcastVoteLoop signs using the bls private key, and broadcast the vote to votepool
func (p *BSCVoteProcessor) signAndBroadcast() error {
var (
latestHeight uint64
err error
)
if p.isOpCrossChain() {
latestHeight, err = p.bscExecutor.GetLatestBlockHeightWithRetry()
if err != nil {
logging.Logger.Errorf("failed to get latest block height, error: %s", err.Error())
return err
}
}
// need to keep track of the height so that make sure that we aggregate packages are from only 1 block.
leastSavedPkgHeight, err := p.daoManager.BSCDao.GetLeastSavedPackagesHeight()
if err != nil {
return fmt.Errorf("failed to get least saved packages' height, error: %s", err.Error())

}
if p.isOpCrossChain() {
if leastSavedPkgHeight+p.config.BSCConfig.NumberOfBlocksForFinality > latestHeight {
return nil
}
}

pkgs, err := p.daoManager.BSCDao.GetPackagesByHeightAndStatus(db.Saved, leastSavedPkgHeight)
if err != nil {
return fmt.Errorf("failed to get packages at height %d from db, error: %s", leastSavedPkgHeight, err.Error())
Expand Down Expand Up @@ -271,7 +295,7 @@ func (p *BSCVoteProcessor) queryMoreThanTwoThirdValidVotes(localVote *model.Vote
if triedTimes > QueryVotepoolMaxRetryTimes {
return fmt.Errorf("exceed max retry=%d", QueryVotepoolMaxRetryTimes)
}
queriedVotes, err := p.bscExecutor.GreenfieldExecutor.QueryVotesByEventHashAndType(localVote.EventHash, votepool.FromBscCrossChainEvent)
queriedVotes, err := p.bscExecutor.GreenfieldExecutor.QueryVotesByEventHashAndType(localVote.EventHash, p.eventType)
if err != nil {
return fmt.Errorf("failed to query votes. eventHash=%s", hex.EncodeToString(localVote.EventHash))
}
Expand Down Expand Up @@ -333,7 +357,7 @@ func (p *BSCVoteProcessor) queryMoreThanTwoThirdValidVotes(localVote *model.Vote

func (p *BSCVoteProcessor) constructSignedVote(eventHash []byte) *votepool.Vote {
var v votepool.Vote
v.EventType = votepool.FromBscCrossChainEvent
v.EventType = p.eventType
v.EventHash = eventHash
p.signer.SignVote(&v)
return &v
Expand Down Expand Up @@ -367,3 +391,7 @@ func (p *BSCVoteProcessor) reBroadcastVote(localVote *model.Vote) error {
func (p *BSCVoteProcessor) getChainId() sdk.ChainID {
return sdk.ChainID(p.config.BSCConfig.ChainId)
}

func (p *BSCVoteProcessor) isOpCrossChain() bool {
return p.config.BSCConfig.IsOpCrossChain()
}
14 changes: 10 additions & 4 deletions vote/greenfield_vote_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,24 @@ type GreenfieldVoteProcessor struct {
signer *VoteSigner
greenfieldExecutor *executor.GreenfieldExecutor
blsPublicKey []byte
eventType votepool.EventType
}

func NewGreenfieldVoteProcessor(cfg *config.Config, dao *dao.DaoManager, signer *VoteSigner,
greenfieldExecutor *executor.GreenfieldExecutor) *GreenfieldVoteProcessor {
var eventType votepool.EventType
if cfg.BSCConfig.IsOpCrossChain() {
eventType = votepool.ToOpCrossChainEvent
} else {
eventType = votepool.ToBscCrossChainEvent
}
return &GreenfieldVoteProcessor{
config: cfg,
daoManager: dao,
signer: signer,
greenfieldExecutor: greenfieldExecutor,
blsPublicKey: greenfieldExecutor.BlsPubKey,
eventType: eventType,
}
}

Expand Down Expand Up @@ -219,14 +227,12 @@ func (p *GreenfieldVoteProcessor) queryMoreThanTwoThirdVotesForTx(localVote *mod
channelId := localVote.ChannelId
seq := localVote.Sequence
ticker := time.NewTicker(VotePoolQueryRetryInterval)

for range ticker.C {
triedTimes++
if triedTimes > QueryVotepoolMaxRetryTimes {
return fmt.Errorf("exceed max retry=%d", QueryVotepoolMaxRetryTimes)
}

queriedVotes, err := p.greenfieldExecutor.QueryVotesByEventHashAndType(localVote.EventHash, votepool.ToBscCrossChainEvent)
queriedVotes, err := p.greenfieldExecutor.QueryVotesByEventHashAndType(localVote.EventHash, p.eventType)
if err != nil {
return fmt.Errorf("failed to query votes. eventHash=%s", hex.EncodeToString(localVote.EventHash))
}
Expand Down Expand Up @@ -291,7 +297,7 @@ func (p *GreenfieldVoteProcessor) queryMoreThanTwoThirdVotesForTx(localVote *mod

func (p *GreenfieldVoteProcessor) constructVoteAndSign(aggregatedPayload []byte) *votepool.Vote {
var v votepool.Vote
v.EventType = votepool.ToBscCrossChainEvent
v.EventType = p.eventType
v.EventHash = p.getEventHash(aggregatedPayload)
p.signer.SignVote(&v)
return &v
Expand Down

0 comments on commit f5b9b1c

Please sign in to comment.