Skip to content

Commit

Permalink
feat: perf enhancement by reducing rpc calls (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 authored Mar 24, 2023
1 parent 7a02e10 commit d0a6195
Show file tree
Hide file tree
Showing 15 changed files with 153 additions and 97 deletions.
44 changes: 26 additions & 18 deletions assembler/bsc_assembler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package assembler

import (
"bytes"
"encoding/hex"
"fmt"
"github.com/bnb-chain/greenfield-relayer/metric"
"time"

"github.com/bnb-chain/greenfield-relayer/common"
Expand All @@ -13,18 +13,20 @@ import (
"github.com/bnb-chain/greenfield-relayer/db/model"
"github.com/bnb-chain/greenfield-relayer/executor"
"github.com/bnb-chain/greenfield-relayer/logging"
"github.com/bnb-chain/greenfield-relayer/metric"
"github.com/bnb-chain/greenfield-relayer/types"
"github.com/bnb-chain/greenfield-relayer/util"
"github.com/bnb-chain/greenfield-relayer/vote"
)

type BSCAssembler struct {
config *config.Config
greenfieldExecutor *executor.GreenfieldExecutor
bscExecutor *executor.BSCExecutor
daoManager *dao.DaoManager
blsPubKey string
metricService *metric.MetricService
config *config.Config
greenfieldExecutor *executor.GreenfieldExecutor
bscExecutor *executor.BSCExecutor
daoManager *dao.DaoManager
blsPubKey []byte
hasRetrievedSequence bool
metricService *metric.MetricService
}

func NewBSCAssembler(cfg *config.Config, executor *executor.BSCExecutor, dao *dao.DaoManager, greenfieldExecutor *executor.GreenfieldExecutor, ms *metric.MetricService) *BSCAssembler {
Expand All @@ -33,7 +35,7 @@ func NewBSCAssembler(cfg *config.Config, executor *executor.BSCExecutor, dao *da
bscExecutor: executor,
daoManager: dao,
greenfieldExecutor: greenfieldExecutor,
blsPubKey: hex.EncodeToString(util.BlsPubKeyFromPrivKeyStr(cfg.GreenfieldConfig.BlsPrivateKey)),
blsPubKey: util.BlsPubKeyFromPrivKeyStr(cfg.GreenfieldConfig.BlsPrivateKey),
metricService: ms,
}
}
Expand All @@ -57,25 +59,32 @@ func (a *BSCAssembler) process(channelId types.ChannelId) error {
if err != nil {
return err
}
isInturnRelyer := inturnRelayer.BlsPubKey == a.blsPubKey
inturnRelayerPubkey, err := hex.DecodeString(inturnRelayer.BlsPubKey)
if err != nil {
return err
}
isInturnRelyer := bytes.Equal(a.blsPubKey, inturnRelayerPubkey)
a.metricService.SetGnfdInturnRelayerMetrics(isInturnRelyer, inturnRelayer.RelayInterval.Start, inturnRelayer.RelayInterval.End)
var startSequence uint64

if isInturnRelyer {
seq, err := a.daoManager.SequenceDao.GetByChannelId(uint8(channelId))
if err != nil {
return err
}
startSequence = uint64(seq.Sequence)

// in-turn relayer get the start sequence from chain first time, it starts to relay after the sequence gets updated
now := time.Now().Unix()
timeDiff := now - int64(inturnRelayer.RelayInterval.Start)
if !a.hasRetrievedSequence {
// in-turn relayer get the start sequence from chain first time, it starts to relay after the sequence gets updated
now := time.Now().Unix()
timeDiff := now - int64(inturnRelayer.RelayInterval.Start)

if timeDiff < a.config.RelayConfig.GreenfieldSequenceUpdateLatency {
if timeDiff < 0 {
return fmt.Errorf("blockchain time and relayer time is not consistent, now %d should be after %d", now, inturnRelayer.RelayInterval.Start)
if timeDiff < a.config.RelayConfig.GreenfieldSequenceUpdateLatency {
if timeDiff < 0 {
return fmt.Errorf("blockchain time and relayer time is not consistent, now %d should be after %d", now, inturnRelayer.RelayInterval.Start)
}
return nil
}
time.Sleep(time.Duration(timeDiff) * time.Second)
startSequence, err = a.bscExecutor.GetNextDeliveryOracleSequenceWithRetry()
if err != nil {
return err
Expand All @@ -84,21 +93,20 @@ func (a *BSCAssembler) process(channelId types.ChannelId) error {
return err
}
}
logging.Logger.Debug("bsc relay as in-turn relayer")
a.metricService.SetNextSequenceForChannelFromDB(uint8(channelId), startSequence)
seqFromChain, err := a.bscExecutor.GetNextDeliveryOracleSequenceWithRetry()
if err != nil {
return err
}
a.metricService.SetNextSequenceForChannelFromChain(uint8(channelId), seqFromChain)
} else {
a.hasRetrievedSequence = false
// non-inturn relayer retries every 10 second, gets the sequence from chain
time.Sleep(time.Duration(a.config.RelayConfig.GreenfieldSequenceUpdateLatency) * time.Second)
startSequence, err = a.bscExecutor.GetNextDeliveryOracleSequenceWithRetry()
if err != nil {
return err
}
logging.Logger.Debug("bsc relay as out-turn relayer")
if err := a.daoManager.BSCDao.UpdateBatchPackagesStatusToDelivered(startSequence); err != nil {
return err
}
Expand Down
101 changes: 62 additions & 39 deletions assembler/greenfield_assembler.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package assembler

import (
"bytes"
"encoding/hex"
"fmt"
"sync"
"time"

"github.com/bnb-chain/greenfield-relayer/common"
Expand All @@ -19,76 +21,96 @@ import (
)

type GreenfieldAssembler struct {
config *config.Config
bscExecutor *executor.BSCExecutor
greenfieldExecutor *executor.GreenfieldExecutor
daoManager *dao.DaoManager
blsPubKey string
metricService *metric.MetricService
mutex sync.RWMutex
config *config.Config
bscExecutor *executor.BSCExecutor
greenfieldExecutor *executor.GreenfieldExecutor
daoManager *dao.DaoManager
blsPubKey []byte
hasRetrievedSequenceByChannelMap map[types.ChannelId]bool // flag for in-turn relayer that if it has requested the sequence from chain during its interval
metricService *metric.MetricService
}

func NewGreenfieldAssembler(cfg *config.Config, executor *executor.GreenfieldExecutor, dao *dao.DaoManager, bscExecutor *executor.BSCExecutor,
ms *metric.MetricService) *GreenfieldAssembler {
channels := cfg.GreenfieldConfig.MonitorChannelList
retrievedSequenceByChannelMap := make(map[types.ChannelId]bool)
for _, c := range channels {
retrievedSequenceByChannelMap[types.ChannelId(c)] = false
}
return &GreenfieldAssembler{
config: cfg,
greenfieldExecutor: executor,
daoManager: dao,
bscExecutor: bscExecutor,
blsPubKey: hex.EncodeToString(util.BlsPubKeyFromPrivKeyStr(cfg.GreenfieldConfig.BlsPrivateKey)),
metricService: ms,
config: cfg,
greenfieldExecutor: executor,
daoManager: dao,
bscExecutor: bscExecutor,
blsPubKey: util.BlsPubKeyFromPrivKeyStr(cfg.GreenfieldConfig.BlsPrivateKey),
hasRetrievedSequenceByChannelMap: retrievedSequenceByChannelMap,
metricService: ms,
}
}

// AssembleTransactionsLoop assemble a tx by gathering votes signature and then call the build-in smart-contract
func (a *GreenfieldAssembler) AssembleTransactionsLoop() {
for _, c := range a.getMonitorChannels() {
go a.assembleTransactionAndSendForChannel(types.ChannelId(c))
}
}

func (a *GreenfieldAssembler) assembleTransactionAndSendForChannel(channelId types.ChannelId) {
ticker := time.NewTicker(common.RetryInterval)
for range ticker.C {
if err := a.process(channelId); err != nil {
logging.Logger.Errorf("encounter error when relaying tx, err=%s ", err.Error())
inturnRelayer, err := a.bscExecutor.GetInturnRelayer()
if err != nil {
logging.Logger.Errorf("encounter error when retrieving in-turn relayer from chain, err=%s ", err.Error())
continue
}
wg := new(sync.WaitGroup)
errChan := make(chan error)
for _, c := range a.getMonitorChannels() {
wg.Add(1)
go a.assembleTransactionAndSendForChannel(types.ChannelId(c), inturnRelayer, errChan, wg)
}
wg.Wait()
}
}

func (a *GreenfieldAssembler) assembleTransactionAndSendForChannel(channelId types.ChannelId, inturnRelayer *types.InturnRelayer, errChan chan error, wg *sync.WaitGroup) {
defer wg.Done()
err := a.process(channelId, inturnRelayer)
if err != nil {
errChan <- err
}
}

func (a *GreenfieldAssembler) process(channelId types.ChannelId) error {
logging.Logger.Infof("current time is %d", time.Now().Unix())
inturnRelayer, err := a.bscExecutor.GetInturnRelayer()
func (a *GreenfieldAssembler) process(channelId types.ChannelId, inturnRelayer *types.InturnRelayer) error {
var startSequence uint64
inturnRelayerPubkey, err := hex.DecodeString(inturnRelayer.BlsPublicKey)
if err != nil {
return err
}
isInturnRelyer := inturnRelayer.BlsPublicKey == a.blsPubKey
isInturnRelyer := bytes.Equal(a.blsPubKey, inturnRelayerPubkey)
a.metricService.SetBSCInturnRelayerMetrics(isInturnRelyer, inturnRelayer.Start, inturnRelayer.End)
var startSequence uint64
if isInturnRelyer {
// get next delivered sequence from DB
seq, err := a.daoManager.SequenceDao.GetByChannelId(uint8(channelId))
if err != nil {
return err
}
startSequence = uint64(seq.Sequence)

// in-turn relayer get the start sequence from chain first time, it starts to relay after the sequence
// get updated
now := time.Now().Unix()
timeDiff := now - int64(inturnRelayer.Start)

if timeDiff < a.config.RelayConfig.BSCSequenceUpdateLatency {
if timeDiff < 0 {
return fmt.Errorf("blockchain time and relayer time is not consistent, now %d should be after %d", now, inturnRelayer.Start)
// in-turn relayer get the start sequence from chain once during its interval
if !a.hasRetrievedSequenceByChannelMap[channelId] {
now := time.Now().Unix()
timeDiff := now - int64(inturnRelayer.Start)
if timeDiff < a.config.RelayConfig.BSCSequenceUpdateLatency {
if timeDiff < 0 {
return fmt.Errorf("blockchain time and relayer time is not consistent, now %d should be after %d", now, inturnRelayer.Start)
}
return nil
}
time.Sleep(time.Duration(timeDiff) * time.Second)
startSequence, err = a.greenfieldExecutor.GetNextDeliverySequenceForChannelWithRetry(channelId)
if err != nil {
return err
}
if err = a.daoManager.SequenceDao.Upsert(uint8(channelId), startSequence); err != nil {
return err
}
a.mutex.Lock()
a.hasRetrievedSequenceByChannelMap[channelId] = true
a.mutex.Unlock()
}
a.metricService.SetNextSequenceForChannelFromDB(uint8(channelId), startSequence)
seqFromChain, err := a.greenfieldExecutor.GetNextDeliverySequenceForChannelWithRetry(channelId)
Expand All @@ -97,12 +119,15 @@ func (a *GreenfieldAssembler) process(channelId types.ChannelId) error {
}
a.metricService.SetNextSequenceForChannelFromChain(uint8(channelId), seqFromChain)
} else {
a.mutex.Lock()
a.hasRetrievedSequenceByChannelMap[channelId] = false
a.mutex.Unlock()

time.Sleep(time.Duration(a.config.RelayConfig.BSCSequenceUpdateLatency) * time.Second)
startSequence, err = a.greenfieldExecutor.GetNextDeliverySequenceForChannelWithRetry(channelId)
startSequence, err := a.greenfieldExecutor.GetNextDeliverySequenceForChannelWithRetry(channelId)
if err != nil {
return err
}
logging.Logger.Debug("gnfd relay as out-turn relayer")
if err := a.daoManager.GreenfieldDao.UpdateBatchTransactionStatusToDelivered(startSequence); err != nil {
return err
}
Expand All @@ -115,7 +140,6 @@ func (a *GreenfieldAssembler) process(channelId types.ChannelId) error {
if endSequence == -1 {
return nil
}
logging.Logger.Debugf("channel %d start seq is %d, end seq is %d ", channelId, startSequence, endSequence)
nonce, err := a.bscExecutor.GetNonce()
if err != nil {
return err
Expand All @@ -141,7 +165,6 @@ func (a *GreenfieldAssembler) process(channelId types.ChannelId) error {
return err
}
logging.Logger.Infof("relayed tx with channel id %d and sequence %d ", tx.ChannelId, tx.Sequence)

nonce++
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
"greenfield_event_type_cross_chain": "cosmos.crosschain.v1.EventCrossChain",
"bsc_cross_chain_package_event_name": "CrossChainPackage",
"cross_chain_package_event_hex": "0x64998dc5a229e7324e622192f111c691edccc3534bbea4b2bd90fbaec936845a",
"cross_chain_contract_addr": "0xd2253A26e6d5b729dDBf4bCce5A78F93C725b455",
"greenfield_light_client_contract_addr": "0x349a42f907c7562B3aaD4431780E4596bC2a053f"
"cross_chain_contract_addr": "0x3a282380958194D1131bC49056abb712Ab98b82B",
"greenfield_light_client_contract_addr": "0x60B1E6259944Ea8CEEfFAe2d50Df33EE3CCc593A"
},
"log_config": {
"level": "DEBUG",
Expand All @@ -54,7 +54,7 @@
"compress": false
},
"admin_config": {
"listen_addr": "0.0.0.0:8080"
"port": 8080
},
"db_config": {
"dialect": "mysql",
Expand Down
6 changes: 3 additions & 3 deletions config/local/config_local_0.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
"greenfield_event_type_cross_chain": "cosmos.crosschain.v1.EventCrossChain",
"bsc_cross_chain_package_event_name": "CrossChainPackage",
"cross_chain_package_event_hex": "0x64998dc5a229e7324e622192f111c691edccc3534bbea4b2bd90fbaec936845a",
"cross_chain_contract_addr": "0xd2253A26e6d5b729dDBf4bCce5A78F93C725b455",
"greenfield_light_client_contract_addr": "0x349a42f907c7562B3aaD4431780E4596bC2a053f"
"cross_chain_contract_addr": "0x3a282380958194D1131bC49056abb712Ab98b82B",
"greenfield_light_client_contract_addr": "0x60B1E6259944Ea8CEEfFAe2d50Df33EE3CCc593A"
},
"log_config": {
"level": "DEBUG",
Expand All @@ -54,7 +54,7 @@
"compress": false
},
"admin_config": {
"listen_addr": "0.0.0.0:8080"
"port": 8080
},
"db_config": {
"dialect": "mysql",
Expand Down
6 changes: 3 additions & 3 deletions config/local/config_local_1.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
"greenfield_event_type_cross_chain": "cosmos.crosschain.v1.EventCrossChain",
"bsc_cross_chain_package_event_name": "CrossChainPackage",
"cross_chain_package_event_hex": "0x64998dc5a229e7324e622192f111c691edccc3534bbea4b2bd90fbaec936845a",
"cross_chain_contract_addr": "0xd2253A26e6d5b729dDBf4bCce5A78F93C725b455",
"greenfield_light_client_contract_addr": "0x349a42f907c7562B3aaD4431780E4596bC2a053f"
"cross_chain_contract_addr": "0x3a282380958194D1131bC49056abb712Ab98b82B",
"greenfield_light_client_contract_addr": "0x60B1E6259944Ea8CEEfFAe2d50Df33EE3CCc593A"
},
"log_config": {
"level": "DEBUG",
Expand All @@ -54,7 +54,7 @@
"compress": false
},
"admin_config": {
"listen_addr": "0.0.0.0:8080"
"port": 8081
},
"db_config": {
"dialect": "mysql",
Expand Down
6 changes: 3 additions & 3 deletions config/local/config_local_2.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
"greenfield_event_type_cross_chain": "cosmos.crosschain.v1.EventCrossChain",
"bsc_cross_chain_package_event_name": "CrossChainPackage",
"cross_chain_package_event_hex": "0x64998dc5a229e7324e622192f111c691edccc3534bbea4b2bd90fbaec936845a",
"cross_chain_contract_addr": "0xd2253A26e6d5b729dDBf4bCce5A78F93C725b455",
"greenfield_light_client_contract_addr": "0x349a42f907c7562B3aaD4431780E4596bC2a053f"
"cross_chain_contract_addr": "0x3a282380958194D1131bC49056abb712Ab98b82B",
"greenfield_light_client_contract_addr": "0x60B1E6259944Ea8CEEfFAe2d50Df33EE3CCc593A"
},
"log_config": {
"level": "DEBUG",
Expand All @@ -54,7 +54,7 @@
"compress": false
},
"admin_config": {
"listen_addr": "0.0.0.0:8080"
"port": 8082
},
"db_config": {
"dialect": "mysql",
Expand Down
9 changes: 9 additions & 0 deletions db/dao/greenfield_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,12 @@ func (d *GreenfieldDao) SaveSyncLightBlockTransaction(t *model.SyncLightBlockTra
return dbTx.Create(t).Error
})
}

func (d *GreenfieldDao) GetLatestSyncedTransaction() (*model.SyncLightBlockTransaction, error) {
tx := model.SyncLightBlockTransaction{}
err := d.DB.Model(model.SyncLightBlockTransaction{}).Order("height desc").Take(&tx).Error
if err != nil && err != gorm.ErrRecordNotFound {
return nil, err
}
return &tx, nil
}
2 changes: 1 addition & 1 deletion executor/greenfield_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (e *GreenfieldExecutor) GetValidatorsBlsPublicKey() ([]string, error) {
}
var keys []string
for _, v := range validators {
keys = append(keys, hex.EncodeToString(v.RelayerBlsKey))
keys = append(keys, hex.EncodeToString(v.BlsKey))
}
return keys, nil
}
Expand Down
Loading

0 comments on commit d0a6195

Please sign in to comment.