diff --git a/core/tx_pool.go b/core/tx_pool.go index 5d0f1d452a..f38b83615e 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -80,6 +80,10 @@ var ( // than some meaningful limit a user might use. This is not a consensus error // making the transaction invalid, rather a DOS protection. ErrOversizedData = errors.New("oversized data") + + // ErrKnownTransaction is returned if a transaction that is already in the pool + // attempting to be added to the pool. + ErrKnownTransaction = errors.New("known transaction") ) var ( @@ -661,7 +665,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { hash := tx.Hash() if pool.all.Get(hash) != nil { logger.Warn().Str("hash", hash.Hex()).Msg("Discarding already known transaction") - return false, fmt.Errorf("known transaction: %x", hash) + return false, errors.WithMessagef(ErrKnownTransaction, "transaction hash %x", hash) } // If the transaction fails basic validation, discard it if err := pool.validateTx(tx, local); err != nil { @@ -866,17 +870,10 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { defer pool.mu.Unlock() // Try to inject the transaction and update any state - isKnownTx := pool.all.Get(tx.Hash()) != nil replace, err := pool.add(tx, local) if err != nil { - if !isKnownTx { - pool.txnErrorSink([]types.RPCTransactionError{ - { - TxHashID: tx.Hash().Hex(), - TimestampOfRejection: time.Now().Unix(), - ErrMessage: err.Error(), - }, - }) + if errors.Cause(err) != ErrKnownTransaction { + pool.txnErrorSink([]types.RPCTransactionError{*types.NewRPCTransactionError(tx.Hash(), err)}) } return err } @@ -906,17 +903,12 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error { for i, tx := range txs { var replace bool - isKnownTx := pool.all.Get(tx.Hash()) != nil if replace, errs[i] = pool.add(tx, local); errs[i] == nil && !replace { from, _ := types.Sender(pool.signer, tx) // already validated dirty[from] = struct{}{} } - if errs[i] != nil && !isKnownTx { - erroredTxns = append(erroredTxns, types.RPCTransactionError{ - TxHashID: tx.Hash().Hex(), - TimestampOfRejection: time.Now().Unix(), - ErrMessage: errs[i].Error(), - }) + if errs[i] != nil && errors.Cause(errs[i]) != ErrKnownTransaction { + erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), errs[i])) } } // Only reprocess the internal state if something was actually added @@ -1008,6 +1000,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Track the promoted transactions to broadcast them at once var promoted []*types.Transaction logger := utils.Logger().With().Stack().Logger() + var erroredTxns []types.RPCTransactionError // Gather all the accounts potentially needing updates if accounts == nil { @@ -1023,9 +1016,14 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { continue // Just in case someone calls with a non existing account } // Drop all transactions that are deemed too old (low nonce) - for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) { + nonce := pool.currentState.GetNonce(addr) + for _, tx := range list.Forward(nonce) { hash := tx.Hash() logger.Warn().Str("hash", hash.Hex()).Msg("Removed old queued transaction") + if pool.chain.CurrentBlock().Transaction(hash) == nil { + err := fmt.Errorf("old transaction, nonce %d is too low", nonce) + erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err)) + } pool.all.Remove(hash) pool.priced.Removed() } @@ -1034,6 +1032,8 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { for _, tx := range drops { hash := tx.Hash() logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable queued transaction") + err := fmt.Errorf("unpayable transaction, out of gas or balance of %d cannot pay cost of %d", tx.Value(), tx.Cost()) + erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err)) pool.all.Remove(hash) pool.priced.Removed() queuedNofundsCounter.Inc(1) @@ -1050,10 +1050,12 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { if !pool.locals.contains(addr) { for _, tx := range list.Cap(int(pool.config.AccountQueue)) { hash := tx.Hash() + logger.Warn().Str("hash", hash.Hex()).Msg("Removed cap-exceeding queued transaction") + err := fmt.Errorf("exceeds cap for queued transactions for account %s", addr.String()) + erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err)) pool.all.Remove(hash) pool.priced.Removed() queuedRateLimitCounter.Inc(1) - logger.Warn().Str("hash", hash.Hex()).Msg("Removed cap-exceeding queued transaction") } } // Delete the entire queue entry if it became empty. @@ -1097,8 +1099,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { for i := 0; i < len(offenders)-1; i++ { list := pool.pending[offenders[i]] for _, tx := range list.Cap(list.Len() - 1) { - // Drop the transaction from the global pools too hash := tx.Hash() + err := fmt.Errorf("fairness-exceeding pending transaction") + erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err)) pool.all.Remove(hash) pool.priced.Removed() @@ -1121,6 +1124,8 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { for _, tx := range list.Cap(list.Len() - 1) { // Drop the transaction from the global pools too hash := tx.Hash() + err := fmt.Errorf("fairness-exceeding pending transaction") + erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err)) pool.all.Remove(hash) pool.priced.Removed() @@ -1161,6 +1166,8 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Drop all transactions if they are less than the overflow if size := uint64(list.Len()); size <= drop { for _, tx := range list.Flatten() { + err := fmt.Errorf("exceeds global cap for queued transactions") + erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err)) pool.removeTx(tx.Hash(), true) } drop -= size @@ -1170,12 +1177,16 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Otherwise drop only last few transactions txs := list.Flatten() for i := len(txs) - 1; i >= 0 && drop > 0; i-- { + err := fmt.Errorf("exceeds global cap for queued transactions") + erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(txs[i].Hash(), err)) pool.removeTx(txs[i].Hash(), true) drop-- queuedRateLimitCounter.Inc(1) } } } + + pool.txnErrorSink(erroredTxns) } // demoteUnexecutables removes invalid and processed transactions from the pools @@ -1194,11 +1205,8 @@ func (pool *TxPool) demoteUnexecutables() { hash := tx.Hash() logger.Warn().Str("hash", hash.Hex()).Msg("Removed old pending transaction") if pool.chain.CurrentBlock().Transaction(hash) == nil { - erroredTxns = append(erroredTxns, types.RPCTransactionError{ - TxHashID: hash.Hex(), - TimestampOfRejection: time.Now().Unix(), - ErrMessage: fmt.Sprintf("old transaction, nonce %d is too low", nonce), - }) + err := fmt.Errorf("old transaction, nonce %d is too low", nonce) + erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err)) } pool.all.Remove(hash) pool.priced.Removed() @@ -1208,12 +1216,8 @@ func (pool *TxPool) demoteUnexecutables() { for _, tx := range drops { hash := tx.Hash() logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable pending transaction") - erroredTxns = append(erroredTxns, types.RPCTransactionError{ - TxHashID: hash.Hex(), - TimestampOfRejection: time.Now().Unix(), - ErrMessage: fmt.Sprintf("unpayable transaction, out of gas or "+ - "balance of %d cannot pay cost of %d", tx.Value(), tx.Cost()), - }) + err := fmt.Errorf("unpayable transaction, out of gas or balance of %d cannot pay cost of %d", tx.Value(), tx.Cost()) + erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err)) pool.all.Remove(hash) pool.priced.Removed() pendingNofundsCounter.Inc(1) @@ -1221,11 +1225,8 @@ func (pool *TxPool) demoteUnexecutables() { for _, tx := range invalids { hash := tx.Hash() logger.Warn().Str("hash", hash.Hex()).Msg("Demoting pending transaction") - erroredTxns = append(erroredTxns, types.RPCTransactionError{ - TxHashID: hash.Hex(), - TimestampOfRejection: time.Now().Unix(), - ErrMessage: "demoting pending transaction", - }) + err := fmt.Errorf("demoting pending transaction") + erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err)) pool.enqueueTx(hash, tx) } // If there's a gap in front, alert (should never happen) and postpone all transactions @@ -1233,11 +1234,8 @@ func (pool *TxPool) demoteUnexecutables() { for _, tx := range list.Cap(0) { hash := tx.Hash() logger.Error().Str("hash", hash.Hex()).Msg("Demoting invalidated transaction") - erroredTxns = append(erroredTxns, types.RPCTransactionError{ - TxHashID: hash.Hex(), - TimestampOfRejection: time.Now().Unix(), - ErrMessage: "demoting invalid transaction", - }) + err := fmt.Errorf("demoting invalid transaction") + erroredTxns = append(erroredTxns, *types.NewRPCTransactionError(tx.Hash(), err)) pool.enqueueTx(hash, tx) } } diff --git a/core/types/transaction.go b/core/types/transaction.go index 2a86857d10..345ed3e3f9 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -23,6 +23,7 @@ import ( "io" "math/big" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -40,6 +41,15 @@ type RPCTransactionError struct { ErrMessage string `json:"error-message"` } +// NewRPCTransactionError ... +func NewRPCTransactionError(hash common.Hash, err error) *RPCTransactionError { + return &RPCTransactionError{ + TxHashID: hash.String(), + TimestampOfRejection: time.Now().Unix(), + ErrMessage: err.Error(), + } +} + // no go:generate gencodec -type txdata -field-override txdataMarshaling -out gen_tx_json.go // Errors constants for Transaction. diff --git a/node/node.go b/node/node.go index 62ab8b8973..453dea5ea8 100644 --- a/node/node.go +++ b/node/node.go @@ -323,12 +323,16 @@ func (node *Node) AddPendingStakingTransaction( // AddPendingTransaction adds one new transaction to the pending transaction list. // This is only called from SDK. func (node *Node) AddPendingTransaction(newTx *types.Transaction) { - if node.Consensus.IsLeader() && newTx.ShardID() == node.NodeConfig.ShardID { + role := node.NodeConfig.Role() + if newTx.ShardID() == node.NodeConfig.ShardID && (node.Consensus.IsLeader() || role == nodeconfig.ExplorerNode) { node.addPendingTransactions(types.Transactions{newTx}) - } else { - utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx") - node.tryBroadcast(newTx) + if role != nodeconfig.ExplorerNode { + return + } } + utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx") + node.tryBroadcast(newTx) + } // AddPendingReceipts adds one receipt message to pending list.