Skip to content

Commit

Permalink
feat: add metrics for relayer (#24)
Browse files Browse the repository at this point in the history
* add metrics for relayer

* re-format

* renaming
  • Loading branch information
alexgao001 authored Mar 24, 2023
1 parent 584ce85 commit 7a02e10
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 23 deletions.
22 changes: 14 additions & 8 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ import (
"github.com/bnb-chain/greenfield-relayer/db/model"
"github.com/bnb-chain/greenfield-relayer/executor"
"github.com/bnb-chain/greenfield-relayer/listener"
"github.com/bnb-chain/greenfield-relayer/metric"
"github.com/bnb-chain/greenfield-relayer/relayer"
"github.com/bnb-chain/greenfield-relayer/vote"
)

type App struct {
BSCRelayer *relayer.BSCRelayer
GnfdRelayer *relayer.GreenfieldRelayer
BSCRelayer *relayer.BSCRelayer
GnfdRelayer *relayer.GreenfieldRelayer
metricService *metric.MetricService
}

func NewApp(cfg *config.Config) *App {
Expand All @@ -44,9 +46,11 @@ func NewApp(cfg *config.Config) *App {
greenfieldExecutor.SetBSCExecutor(bscExecutor)
bscExecutor.SetGreenfieldExecutor(greenfieldExecutor)

metricService := metric.NewMetricService(cfg)

// listeners
greenfieldListener := listener.NewGreenfieldListener(cfg, greenfieldExecutor, bscExecutor, daoManager)
bscListener := listener.NewBSCListener(cfg, bscExecutor, greenfieldExecutor, daoManager)
greenfieldListener := listener.NewGreenfieldListener(cfg, greenfieldExecutor, bscExecutor, daoManager, metricService)
bscListener := listener.NewBSCListener(cfg, bscExecutor, greenfieldExecutor, daoManager, metricService)

// vote signer
signer := vote.NewVoteSigner(ethcommon.Hex2Bytes(cfg.GreenfieldConfig.BlsPrivateKey))
Expand All @@ -56,20 +60,22 @@ func NewApp(cfg *config.Config) *App {
bscVoteProcessor := vote.NewBSCVoteProcessor(cfg, daoManager, signer, bscExecutor)

// assemblers
greenfieldAssembler := assembler.NewGreenfieldAssembler(cfg, greenfieldExecutor, daoManager, bscExecutor)
bscAssembler := assembler.NewBSCAssembler(cfg, bscExecutor, daoManager, greenfieldExecutor)
greenfieldAssembler := assembler.NewGreenfieldAssembler(cfg, greenfieldExecutor, daoManager, bscExecutor, metricService)
bscAssembler := assembler.NewBSCAssembler(cfg, bscExecutor, daoManager, greenfieldExecutor, metricService)

// relayers
gnfdRelayer := relayer.NewGreenfieldRelayer(greenfieldListener, greenfieldExecutor, bscExecutor, greenfieldVoteProcessor, greenfieldAssembler)
bscRelayer := relayer.NewBSCRelayer(bscListener, greenfieldExecutor, bscExecutor, bscVoteProcessor, bscAssembler)

return &App{
BSCRelayer: bscRelayer,
GnfdRelayer: gnfdRelayer,
BSCRelayer: bscRelayer,
GnfdRelayer: gnfdRelayer,
metricService: metricService,
}
}

func (a *App) Start() {
a.GnfdRelayer.Start()
a.BSCRelayer.Start()
a.metricService.Start()
}
14 changes: 12 additions & 2 deletions assembler/bsc_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package assembler
import (
"encoding/hex"
"fmt"
"github.com/bnb-chain/greenfield-relayer/metric"
"time"

"github.com/bnb-chain/greenfield-relayer/common"
Expand All @@ -23,15 +24,17 @@ type BSCAssembler struct {
bscExecutor *executor.BSCExecutor
daoManager *dao.DaoManager
blsPubKey string
metricService *metric.MetricService
}

func NewBSCAssembler(cfg *config.Config, executor *executor.BSCExecutor, dao *dao.DaoManager, greenfieldExecutor *executor.GreenfieldExecutor) *BSCAssembler {
func NewBSCAssembler(cfg *config.Config, executor *executor.BSCExecutor, dao *dao.DaoManager, greenfieldExecutor *executor.GreenfieldExecutor, ms *metric.MetricService) *BSCAssembler {
return &BSCAssembler{
config: cfg,
bscExecutor: executor,
daoManager: dao,
greenfieldExecutor: greenfieldExecutor,
blsPubKey: hex.EncodeToString(util.BlsPubKeyFromPrivKeyStr(cfg.GreenfieldConfig.BlsPrivateKey)),
metricService: ms,
}
}

Expand All @@ -55,6 +58,7 @@ func (a *BSCAssembler) process(channelId types.ChannelId) error {
return err
}
isInturnRelyer := inturnRelayer.BlsPubKey == a.blsPubKey
a.metricService.SetGnfdInturnRelayerMetrics(isInturnRelyer, inturnRelayer.RelayInterval.Start, inturnRelayer.RelayInterval.End)
var startSequence uint64
if isInturnRelyer {
seq, err := a.daoManager.SequenceDao.GetByChannelId(uint8(channelId))
Expand All @@ -81,6 +85,12 @@ func (a *BSCAssembler) process(channelId types.ChannelId) error {
}
}
logging.Logger.Debug("bsc relay as in-turn relayer")
a.metricService.SetNextSequenceForChannelFromDB(uint8(channelId), startSequence)
seqFromChain, err := a.bscExecutor.GetNextDeliveryOracleSequenceWithRetry()
if err != nil {
return err
}
a.metricService.SetNextSequenceForChannelFromChain(uint8(channelId), seqFromChain)
} else {
// non-inturn relayer retries every 10 second, gets the sequence from chain
time.Sleep(time.Duration(a.config.RelayConfig.GreenfieldSequenceUpdateLatency) * time.Second)
Expand Down Expand Up @@ -162,6 +172,7 @@ func (a *BSCAssembler) processPkgs(pkgs []*model.BscRelayPackage, channelId uint
for _, p := range pkgs {
pkgIds = append(pkgIds, p.Id)
}
a.metricService.SetBSCProcessedBlockHeight(pkgs[0].Height)
if !isInturnRelyer {
if err = a.daoManager.BSCDao.UpdateBatchPackagesClaimedTxHash(pkgIds, txHash); err != nil {
return err
Expand All @@ -176,6 +187,5 @@ func (a *BSCAssembler) processPkgs(pkgs []*model.BscRelayPackage, channelId uint
if err = a.daoManager.SequenceDao.Upsert(channelId, sequence+1); err != nil {
return err
}

return nil
}
17 changes: 14 additions & 3 deletions assembler/greenfield_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/bnb-chain/greenfield-relayer/db/model"
"github.com/bnb-chain/greenfield-relayer/executor"
"github.com/bnb-chain/greenfield-relayer/logging"
"github.com/bnb-chain/greenfield-relayer/metric"
"github.com/bnb-chain/greenfield-relayer/types"
"github.com/bnb-chain/greenfield-relayer/util"
"github.com/bnb-chain/greenfield-relayer/vote"
Expand All @@ -23,15 +24,18 @@ type GreenfieldAssembler struct {
greenfieldExecutor *executor.GreenfieldExecutor
daoManager *dao.DaoManager
blsPubKey string
metricService *metric.MetricService
}

func NewGreenfieldAssembler(cfg *config.Config, executor *executor.GreenfieldExecutor, dao *dao.DaoManager, bscExecutor *executor.BSCExecutor) *GreenfieldAssembler {
func NewGreenfieldAssembler(cfg *config.Config, executor *executor.GreenfieldExecutor, dao *dao.DaoManager, bscExecutor *executor.BSCExecutor,
ms *metric.MetricService) *GreenfieldAssembler {
return &GreenfieldAssembler{
config: cfg,
greenfieldExecutor: executor,
daoManager: dao,
bscExecutor: bscExecutor,
blsPubKey: hex.EncodeToString(util.BlsPubKeyFromPrivKeyStr(cfg.GreenfieldConfig.BlsPrivateKey)),
metricService: ms,
}
}

Expand All @@ -58,6 +62,7 @@ func (a *GreenfieldAssembler) process(channelId types.ChannelId) error {
return err
}
isInturnRelyer := inturnRelayer.BlsPublicKey == a.blsPubKey
a.metricService.SetBSCInturnRelayerMetrics(isInturnRelyer, inturnRelayer.Start, inturnRelayer.End)
var startSequence uint64
if isInturnRelyer {
// get next delivered sequence from DB
Expand Down Expand Up @@ -85,7 +90,12 @@ func (a *GreenfieldAssembler) process(channelId types.ChannelId) error {
return err
}
}
logging.Logger.Debug("gnfd relay as in-turn relayer")
a.metricService.SetNextSequenceForChannelFromDB(uint8(channelId), startSequence)
seqFromChain, err := a.greenfieldExecutor.GetNextDeliverySequenceForChannelWithRetry(channelId)
if err != nil {
return err
}
a.metricService.SetNextSequenceForChannelFromChain(uint8(channelId), seqFromChain)
} else {
time.Sleep(time.Duration(a.config.RelayConfig.BSCSequenceUpdateLatency) * time.Second)
startSequence, err = a.greenfieldExecutor.GetNextDeliverySequenceForChannelWithRetry(channelId)
Expand Down Expand Up @@ -131,6 +141,7 @@ func (a *GreenfieldAssembler) process(channelId types.ChannelId) error {
return err
}
logging.Logger.Infof("relayed tx with channel id %d and sequence %d ", tx.ChannelId, tx.Sequence)

nonce++
}
return nil
Expand All @@ -157,6 +168,7 @@ func (a *GreenfieldAssembler) processTx(tx *model.GreenfieldRelayTransaction, no
return err
}
logging.Logger.Infof("relayed transaction with channel id %d and sequence %d, get txHash %s", tx.ChannelId, tx.Sequence, txHash)
a.metricService.SetGnfdProcessedBlockHeight(tx.Height)

// update next delivery sequence in DB for inturn relayer, for non-inturn relayer, there is enough time for
// sequence update, so they can track next start seq from chain
Expand All @@ -173,7 +185,6 @@ func (a *GreenfieldAssembler) processTx(tx *model.GreenfieldRelayTransaction, no
if err = a.daoManager.SequenceDao.Upsert(tx.ChannelId, tx.Sequence+1); err != nil {
return err
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ type Config struct {
}

type AdminConfig struct {
ListenAddr string `json:"listen_addr"`
Port uint16 `json:"port"`
}

func (cfg *AdminConfig) Validate() {
if cfg.ListenAddr == "" {
panic("listen address should not be empty")
if cfg.Port <= 0 || cfg.Port > 65535 {
panic("port should be within (0, 65535]")
}
}

Expand Down
2 changes: 1 addition & 1 deletion integrationtest/config/config_test.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"compress": false
},
"admin_config": {
"listen_addr": "0.0.0.0:8080"
"port": 8080
},
"db_config": {
"dialect": "mysql",
Expand Down
14 changes: 10 additions & 4 deletions listener/bsc_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/bnb-chain/greenfield-relayer/executor"
"github.com/bnb-chain/greenfield-relayer/executor/crosschain"
"github.com/bnb-chain/greenfield-relayer/logging"
"github.com/bnb-chain/greenfield-relayer/metric"
rtypes "github.com/bnb-chain/greenfield-relayer/types"
)

Expand All @@ -27,9 +28,10 @@ type BSCListener struct {
greenfieldExecutor *executor.GreenfieldExecutor
DaoManager *dao.DaoManager
crossChainAbi abi.ABI
monitorService *metric.MetricService
}

func NewBSCListener(cfg *config.Config, bscExecutor *executor.BSCExecutor, gnfdExecutor *executor.GreenfieldExecutor, dao *dao.DaoManager) *BSCListener {
func NewBSCListener(cfg *config.Config, bscExecutor *executor.BSCExecutor, gnfdExecutor *executor.GreenfieldExecutor, dao *dao.DaoManager, ms *metric.MetricService) *BSCListener {
crossChainAbi, err := abi.JSON(strings.NewReader(crosschain.CrosschainMetaData.ABI))
if err != nil {
panic("marshal abi error")
Expand All @@ -40,6 +42,7 @@ func NewBSCListener(cfg *config.Config, bscExecutor *executor.BSCExecutor, gnfdE
greenfieldExecutor: gnfdExecutor,
DaoManager: dao,
crossChainAbi: crossChainAbi,
monitorService: ms,
}
}

Expand Down Expand Up @@ -130,14 +133,17 @@ func (l *BSCListener) monitorCrossChainPkgAt(nextHeight uint64, latestPolledBloc
relayPkgs = append(relayPkgs, relayPkg)
}

return l.DaoManager.BSCDao.SaveBlockAndBatchPackages(
if err := l.DaoManager.BSCDao.SaveBlockAndBatchPackages(
&model.BscBlock{
BlockHash: nextHeightBlockHeader.Hash().String(),
ParentHash: nextHeightBlockHeader.ParentHash.String(),
Height: nextHeight,
BlockTime: int64(nextHeightBlockHeader.Time),
},
relayPkgs)
}, relayPkgs); err != nil {
return err
}
l.monitorService.SetBSCSavedBlockHeight(nextHeight)
return nil
}

func (l *BSCListener) queryCrossChainLogs(blockHash ethcommon.Hash) ([]types.Log, error) {
Expand Down
12 changes: 10 additions & 2 deletions listener/greenfield_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/bnb-chain/greenfield-relayer/db/model"
"github.com/bnb-chain/greenfield-relayer/executor"
"github.com/bnb-chain/greenfield-relayer/logging"
"github.com/bnb-chain/greenfield-relayer/metric"
"github.com/bnb-chain/greenfield-relayer/util"
)

Expand All @@ -25,14 +26,17 @@ type GreenfieldListener struct {
greenfieldExecutor *executor.GreenfieldExecutor
bscExecutor *executor.BSCExecutor
DaoManager *dao.DaoManager
metricService *metric.MetricService
}

func NewGreenfieldListener(cfg *config.Config, gnfdExecutor *executor.GreenfieldExecutor, bscExecutor *executor.BSCExecutor, dao *dao.DaoManager) *GreenfieldListener {
func NewGreenfieldListener(cfg *config.Config, gnfdExecutor *executor.GreenfieldExecutor, bscExecutor *executor.BSCExecutor,
dao *dao.DaoManager, ms *metric.MetricService) *GreenfieldListener {
return &GreenfieldListener{
config: cfg,
greenfieldExecutor: gnfdExecutor,
bscExecutor: bscExecutor,
DaoManager: dao,
metricService: ms,
}
}

Expand Down Expand Up @@ -83,7 +87,11 @@ func (l *GreenfieldListener) poll() error {
Height: uint64(block.Height),
BlockTime: block.Time.Unix(),
}
return l.DaoManager.GreenfieldDao.SaveBlockAndBatchTransactions(b, txs)
if err := l.DaoManager.GreenfieldDao.SaveBlockAndBatchTransactions(b, txs); err != nil {
return err
}
l.metricService.SetGnfdSavedBlockHeight(uint64(block.Height))
return nil
}
}
}
Expand Down
Loading

0 comments on commit 7a02e10

Please sign in to comment.