Skip to content
This repository has been archived by the owner on Jun 9, 2024. It is now read-only.

Commit

Permalink
Merge branch 'cal/logs-debugging' of github.com:berachain/polaris int…
Browse files Browse the repository at this point in the history
…o zaki-with-logs
  • Loading branch information
calbera committed Feb 1, 2024
2 parents 1f5b812 + 3877af2 commit d8578aa
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 14 deletions.
5 changes: 4 additions & 1 deletion cosmos/runtime/ante/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,18 @@ func (ah *Provider) AnteHandler() func(
return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) {
// If the transaction contains a single EVM transaction, use the EVM ante handler
if len(tx.GetMsgs()) == 1 { //nolint:nestif // todo:fix.
if _, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok {
if ethTx, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok {
if ah.isValidator {
return ctx, errors.New("validator cannot accept EVM from comet")
}
ctx.Logger().Info("running evm ante handler for eth tx", "hash", ethTx.Unwrap().Hash())
return ah.evmAnteHandler(ctx, tx, simulate)
} else if _, ok = tx.GetMsgs()[0].(*evmtypes.WrappedPayloadEnvelope); ok {
if ctx.ExecMode() != sdk.ExecModeCheck {
ctx.Logger().Info("running evm ante handler for payload tx")
return ctx, nil
}
ctx.Logger().Error("running evm ante handler for payload tx in check tx")
return ctx, errors.New("payload envelope is not supported in CheckTx")
}
}
Expand Down
31 changes: 24 additions & 7 deletions cosmos/runtime/txpool/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,48 +44,63 @@ func (m *Mempool) AnteHandle(
telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs)
msgs := tx.GetMsgs()

ctx.Logger().Info("AnteHandle Polaris Mempool", "msgs", len(msgs), "simulate", simulate)

// TODO: Record the time it takes to build a payload.

// We only want to eject transactions from comet on recheck.
if ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck {
ctx.Logger().Info("AnteHandle in Check/Recheck tx")
if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok {
ethTx := wet.Unwrap()
ctx.Logger().Info("AnteHandle for eth tx", "tx", ethTx.Hash(), "mode", ctx.ExecMode())
if shouldEject := m.shouldEjectFromCometMempool(
ctx.BlockTime().Unix(), ethTx,
ctx, ethTx,
); shouldEject {
ctx.Logger().Info("AnteHandle dropping tx from comet mempool", "tx", ethTx.Hash())
// m.crc.DropRemoteTx(ethTx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs)
return ctx, errors.New("eject from comet mempool")
} else if ctx.ExecMode() == sdk.ExecModeReCheck && m.forceBroadcastOnRecheck {
// We optionally force a re-broadcast.
m.ForwardToValidator(ethTx)
}
ctx.Logger().Info("AnteHandle NOT dropping comet mempool", "tx", ethTx.Hash())
}
}
return next(ctx, tx, simulate)
}

// shouldEject returns true if the transaction should be ejected from the CometBFT mempool.
func (m *Mempool) shouldEjectFromCometMempool(
currentTime int64, tx *ethtypes.Transaction,
ctx sdk.Context, tx *ethtypes.Transaction,
) bool {
defer telemetry.MeasureSince(time.Now(), MetricKeyTimeShouldEject)
if tx == nil {
ctx.Logger().Info("shouldEjectFromCometMempool: tx is nil")
return false
}

// First check things that are stateless.
if m.validateStateless(tx, currentTime) {
if m.validateStateless(ctx, tx) {
ctx.Logger().Info("shouldEjectFromCometMempool: stateless failed", "tx", tx.Hash())
return true
}

// Then check for things that are stateful.
return m.validateStateful(tx)
return m.validateStateful(ctx, tx)
}

// validateStateless returns whether the tx of the given hash is stateless.
func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) bool {
func (m *Mempool) validateStateless(ctx sdk.Context, tx *ethtypes.Transaction) bool {
txHash := tx.Hash()
currentTime := ctx.BlockTime().Unix()
ctx.Logger().Info("validateStateless", "txHash", txHash, "currentTime", currentTime)

// 1. If the transaction has been in the mempool for longer than the configured timeout.
// 2. If the transaction's gas params are less than or equal to the configured limit.
expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime
ctx.Logger().Info("validateStateless", "currentTime", currentTime, "timeFirstSeen", m.crc.TimeFirstSeen(txHash), "expired", expired)
priceLeLimit := tx.GasPrice().Cmp(m.priceLimit) <= 0

if expired {
Expand All @@ -95,20 +110,22 @@ func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64)
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit)
}

ctx.Logger().Info("validateStateless", "expired", expired, "priceLeLimit", priceLeLimit)

return expired || priceLeLimit
}

// includedCanonicalChain returns whether the tx of the given hash is included in the canonical
// Eth chain.
func (m *Mempool) validateStateful(tx *ethtypes.Transaction) bool {
func (m *Mempool) validateStateful(ctx sdk.Context, tx *ethtypes.Transaction) bool {
// // 1. If the transaction has been included in a block.
// signer := ethtypes.LatestSignerForChainID(m.chainConfig.ChainID)
// if _, err := ethtypes.Sender(signer, tx); err != nil {
// return true
// }

// tx.Nonce() <
included := m.chain.GetTransactionLookup(tx.Hash()) != nil
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion)
ctx.Logger().Info("validateStateful", "included", included)
return included
}
16 changes: 11 additions & 5 deletions cosmos/runtime/txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
// size of tx pool.
const (
txChanSize = 4096
maxRetries = 5
maxRetries = 0
retryDelay = 50 * time.Millisecond
statPeriod = 60 * time.Second
)
Expand Down Expand Up @@ -129,7 +129,7 @@ func (h *handler) Start() error {
return errors.New("handler already started")
}
go h.mainLoop()
go h.failedLoop() // Start the retry policy
// go h.failedLoop() // Start the retry policy
go h.statLoop()
return nil
}
Expand Down Expand Up @@ -182,6 +182,7 @@ func (h *handler) failedLoop() {
h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries)
continue
}
h.logger.Info("retrying failed tx", "tx", failed.tx.Hash(), "retries", failed.retries)
telemetry.IncrCounter(float32(1), MetricKeyBroadcastRetry)
h.broadcastTransaction(failed.tx, failed.retries-1)
}
Expand Down Expand Up @@ -235,11 +236,12 @@ func (h *handler) broadcastTransactions(txs ethtypes.Transactions) {
numBroadcasted := 0
for _, signedEthTx := range txs {
if !h.crc.IsRemoteTx(signedEthTx.Hash()) {
h.logger.Info("broadcasting local eth tx", "hash", signedEthTx.Hash().Hex())
h.broadcastTransaction(signedEthTx, maxRetries)
numBroadcasted++
}
}
h.logger.Debug(
h.logger.Info(
"broadcasting transactions", "num_received", len(txs), "num_broadcasted", numBroadcasted,
)
}
Expand All @@ -252,6 +254,8 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) {
return
}

h.logger.Info("broadcasting to comet", "ethTx", tx.Hash(), "sdkTx", txBytes)

// Send the transaction to the CometBFT mempool, which will gossip it to peers via
// CometBFT's p2p layer.
rsp, err := h.clientCtx.BroadcastTxSync(txBytes)
Expand All @@ -264,21 +268,23 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) {
// If rsp == 1, likely the txn is already in a block, and thus the broadcast failing is actually
// the desired behaviour.
if rsp == nil || rsp.Code == 0 || rsp.Code == 1 {
h.logger.Info("broadcasting to comet", "hash", tx.Hash(), "rsp", rsp, "code", rsp.Code)
return
}

switch rsp.Code {
case sdkerrors.ErrMempoolIsFull.ABCICode():
h.logger.Error("failed to broadcast: comet-bft mempool is full", "tx_hash", tx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyMempoolFull)
case
sdkerrors.ErrTxInMempoolCache.ABCICode():
case sdkerrors.ErrTxInMempoolCache.ABCICode():
return
default:
h.logger.Error("failed to broadcast transaction",
"codespace", rsp.Codespace, "code", rsp.Code, "info", rsp.Info, "tx_hash", tx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyBroadcastFailure)
}

h.logger.Info("failed to broadcast transaction", "tx_hash", tx.Hash(), "retries", retries)

h.failedTxs <- &failedTx{tx: tx, retries: retries}
}
8 changes: 7 additions & 1 deletion cosmos/runtime/txpool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,14 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
sCtx := sdk.UnwrapSDKContext(ctx)
msgs := sdkTx.GetMsgs()
if len(msgs) != 1 {
sCtx.Logger().Error("mempool insert: only one message is supported")
return errors.New("only one message is supported")
}

wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0])
if !ok {
// We have to return nil for non-ethereum transactions as to not fail check-tx.
sCtx.Logger().Info("mempool insert: not an ethereum transaction")
return nil
}

Expand All @@ -193,7 +195,11 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
}

// Add the eth tx to the remote cache.
_ = m.crc.MarkRemoteSeen(ethTx.Hash())
sCtx.Logger().Info(
"mempool insert: marking remote seen", "tx", ethTx.Hash(), "time", time.Now().Unix(),
"is(already)RemoteTx", m.crc.IsRemoteTx(ethTx.Hash()),
)
m.crc.MarkRemoteSeen(ethTx.Hash())

return nil
}
Expand Down

0 comments on commit d8578aa

Please sign in to comment.