Skip to content

Commit

Permalink
Report failed transactions (#2157)
Browse files Browse the repository at this point in the history
* [core/tx_pool] add `ErrKnownTransaction` & report errors from `promoteExecutables`

* [node] Make explorer node add pending transactions

* [core/types] Add `NewRPCTransactionError`

* [core/tx_pool] Refactor error sink msgs to use `NewRPCTransactionError`
  • Loading branch information
Daniel-VDM authored and rlan35 committed Jan 28, 2020
1 parent e1bd071 commit 1cea6c6
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 45 deletions.
80 changes: 39 additions & 41 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ var (
// than some meaningful limit a user might use. This is not a consensus error
// making the transaction invalid, rather a DOS protection.
ErrOversizedData = errors.New("oversized data")

// ErrKnownTransaction is returned if a transaction that is already in the pool
// attempting to be added to the pool.
ErrKnownTransaction = errors.New("known transaction")
)

var (
Expand Down Expand Up @@ -661,7 +665,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
hash := tx.Hash()
if pool.all.Get(hash) != nil {
logger.Warn().Str("hash", hash.Hex()).Msg("Discarding already known transaction")
return false, fmt.Errorf("known transaction: %x", hash)
return false, errors.WithMessagef(ErrKnownTransaction, "transaction hash %x", hash)
}
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil {
Expand Down Expand Up @@ -866,17 +870,10 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
defer pool.mu.Unlock()

// Try to inject the transaction and update any state
isKnownTx := pool.all.Get(tx.Hash()) != nil
replace, err := pool.add(tx, local)
if err != nil {
if !isKnownTx {
pool.txnErrorSink([]types.RPCTransactionError{
{
TxHashID: tx.Hash().Hex(),
TimestampOfRejection: time.Now().Unix(),
ErrMessage: err.Error(),
},
})
if errors.Cause(err) != ErrKnownTransaction {
pool.txnErrorSink([]types.RPCTransactionError{*types.NewRPCTransactionError(tx.Hash(), err)})
}
return err
}
Expand Down Expand Up @@ -906,17 +903,12 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {

for i, tx := range txs {
var replace bool
isKnownTx := pool.all.Get(tx.Hash()) != nil
if replace, errs[i] = pool.add(tx, local); errs[i] == nil && !replace {
from, _ := types.Sender(pool.signer, tx) // already validated
dirty[from] = struct{}{}
}
if errs[i] != nil && !isKnownTx {
erroredTxns = append(erroredTxns, types.RPCTransactionError{
TxHashID: tx.Hash().Hex(),
TimestampOfRejection: time.Now().Unix(),
ErrMessage: errs[i].Error(),
})
if errs[i] != nil && errors.Cause(errs[i]) != ErrKnownTransaction {
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), errs[i]))
}
}
// Only reprocess the internal state if something was actually added
Expand Down Expand Up @@ -1008,6 +1000,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction
logger := utils.Logger().With().Stack().Logger()
var erroredTxns []types.RPCTransactionError

// Gather all the accounts potentially needing updates
if accounts == nil {
Expand All @@ -1023,9 +1016,14 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
nonce := pool.currentState.GetNonce(addr)
for _, tx := range list.Forward(nonce) {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Removed old queued transaction")
if pool.chain.CurrentBlock().Transaction(hash) == nil {
err := fmt.Errorf("old transaction, nonce %d is too low", nonce)
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
}
pool.all.Remove(hash)
pool.priced.Removed()
}
Expand All @@ -1034,6 +1032,8 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range drops {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable queued transaction")
err := fmt.Errorf("unpayable transaction, out of gas or balance of %d cannot pay cost of %d", tx.Value(), tx.Cost())
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.all.Remove(hash)
pool.priced.Removed()
queuedNofundsCounter.Inc(1)
Expand All @@ -1050,10 +1050,12 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
if !pool.locals.contains(addr) {
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Removed cap-exceeding queued transaction")
err := fmt.Errorf("exceeds cap for queued transactions for account %s", addr.String())
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.all.Remove(hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
logger.Warn().Str("hash", hash.Hex()).Msg("Removed cap-exceeding queued transaction")
}
}
// Delete the entire queue entry if it became empty.
Expand Down Expand Up @@ -1097,8 +1099,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for i := 0; i < len(offenders)-1; i++ {
list := pool.pending[offenders[i]]
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
err := fmt.Errorf("fairness-exceeding pending transaction")
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.all.Remove(hash)
pool.priced.Removed()

Expand All @@ -1121,6 +1124,8 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
err := fmt.Errorf("fairness-exceeding pending transaction")
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.all.Remove(hash)
pool.priced.Removed()

Expand Down Expand Up @@ -1161,6 +1166,8 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
err := fmt.Errorf("exceeds global cap for queued transactions")
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.removeTx(tx.Hash(), true)
}
drop -= size
Expand All @@ -1170,12 +1177,16 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
err := fmt.Errorf("exceeds global cap for queued transactions")
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(txs[i].Hash(), err))
pool.removeTx(txs[i].Hash(), true)
drop--
queuedRateLimitCounter.Inc(1)
}
}
}

pool.txnErrorSink(erroredTxns)
}

// demoteUnexecutables removes invalid and processed transactions from the pools
Expand All @@ -1194,11 +1205,8 @@ func (pool *TxPool) demoteUnexecutables() {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Removed old pending transaction")
if pool.chain.CurrentBlock().Transaction(hash) == nil {
erroredTxns = append(erroredTxns, types.RPCTransactionError{
TxHashID: hash.Hex(),
TimestampOfRejection: time.Now().Unix(),
ErrMessage: fmt.Sprintf("old transaction, nonce %d is too low", nonce),
})
err := fmt.Errorf("old transaction, nonce %d is too low", nonce)
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
}
pool.all.Remove(hash)
pool.priced.Removed()
Expand All @@ -1208,36 +1216,26 @@ func (pool *TxPool) demoteUnexecutables() {
for _, tx := range drops {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable pending transaction")
erroredTxns = append(erroredTxns, types.RPCTransactionError{
TxHashID: hash.Hex(),
TimestampOfRejection: time.Now().Unix(),
ErrMessage: fmt.Sprintf("unpayable transaction, out of gas or "+
"balance of %d cannot pay cost of %d", tx.Value(), tx.Cost()),
})
err := fmt.Errorf("unpayable transaction, out of gas or balance of %d cannot pay cost of %d", tx.Value(), tx.Cost())
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.all.Remove(hash)
pool.priced.Removed()
pendingNofundsCounter.Inc(1)
}
for _, tx := range invalids {
hash := tx.Hash()
logger.Warn().Str("hash", hash.Hex()).Msg("Demoting pending transaction")
erroredTxns = append(erroredTxns, types.RPCTransactionError{
TxHashID: hash.Hex(),
TimestampOfRejection: time.Now().Unix(),
ErrMessage: "demoting pending transaction",
})
err := fmt.Errorf("demoting pending transaction")
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.enqueueTx(hash, tx)
}
// If there's a gap in front, alert (should never happen) and postpone all transactions
if list.Len() > 0 && list.txs.Get(nonce) == nil {
for _, tx := range list.Cap(0) {
hash := tx.Hash()
logger.Error().Str("hash", hash.Hex()).Msg("Demoting invalidated transaction")
erroredTxns = append(erroredTxns, types.RPCTransactionError{
TxHashID: hash.Hex(),
TimestampOfRejection: time.Now().Unix(),
ErrMessage: "demoting invalid transaction",
})
err := fmt.Errorf("demoting invalid transaction")
erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err))
pool.enqueueTx(hash, tx)
}
}
Expand Down
10 changes: 10 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"math/big"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand All @@ -40,6 +41,15 @@ type RPCTransactionError struct {
ErrMessage string `json:"error-message"`
}

// NewRPCTransactionError ...
func NewRPCTransactionError(hash common.Hash, err error) *RPCTransactionError {
return &RPCTransactionError{
TxHashID: hash.String(),
TimestampOfRejection: time.Now().Unix(),
ErrMessage: err.Error(),
}
}

// no go:generate gencodec -type txdata -field-override txdataMarshaling -out gen_tx_json.go

// Errors constants for Transaction.
Expand Down
12 changes: 8 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,16 @@ func (node *Node) AddPendingStakingTransaction(
// AddPendingTransaction adds one new transaction to the pending transaction list.
// This is only called from SDK.
func (node *Node) AddPendingTransaction(newTx *types.Transaction) {
if node.Consensus.IsLeader() && newTx.ShardID() == node.NodeConfig.ShardID {
role := node.NodeConfig.Role()
if newTx.ShardID() == node.NodeConfig.ShardID && (node.Consensus.IsLeader() || role == nodeconfig.ExplorerNode) {
node.addPendingTransactions(types.Transactions{newTx})
} else {
utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx")
node.tryBroadcast(newTx)
if role != nodeconfig.ExplorerNode {
return
}
}
utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx")
node.tryBroadcast(newTx)

}

// AddPendingReceipts adds one receipt message to pending list.
Expand Down

0 comments on commit 1cea6c6

Please sign in to comment.