From 5b27f633f0bb0f818f9b42c119a4773d1a0f321a Mon Sep 17 00:00:00 2001 From: skosito Date: Tue, 7 May 2024 23:02:14 +0200 Subject: [PATCH 01/11] Fork sender nonce mempool and modify so it works for unsigned txs --- app/app.go | 5 + cmd/zetacored/mempool.go | 321 +++++++++++++++++++++++++++++++++++++++ cmd/zetacored/root.go | 7 +- 3 files changed, 331 insertions(+), 2 deletions(-) create mode 100644 cmd/zetacored/mempool.go diff --git a/app/app.go b/app/app.go index 234c979e7d..1fc5accfb9 100644 --- a/app/app.go +++ b/app/app.go @@ -297,6 +297,7 @@ func New( bApp.SetCommitMultiStoreTracer(traceStore) bApp.SetVersion(version.Version) bApp.SetInterfaceRegistry(interfaceRegistry) + bApp.SetTxEncoder(encodingConfig.TxConfig.TxEncoder()) keys := sdk.NewKVStoreKeys( authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey, @@ -341,6 +342,10 @@ func New( app.ConsensusParamsKeeper = consensusparamkeeper.NewKeeper(appCodec, keys[consensusparamtypes.StoreKey], authAddr) bApp.SetParamStore(&app.ConsensusParamsKeeper) + // DefaultProposalHandler also checks for sigs, probably will need custom + // customProposalHandler := NewCustomProposalHandler(bApp.Mempool(), bApp) + // app.SetPrepareProposal(customProposalHandler.PrepareProposalHandler()) + // add keepers // use custom Ethermint account for contracts app.AccountKeeper = authkeeper.NewAccountKeeper( diff --git a/cmd/zetacored/mempool.go b/cmd/zetacored/mempool.go new file mode 100644 index 0000000000..c3d923612d --- /dev/null +++ b/cmd/zetacored/mempool.go @@ -0,0 +1,321 @@ +// This is fork of SenderNonceMempool from cosmos sdk 0.47 (check: https://github.com/cosmos/cosmos-sdk/blob/v0.47.10/types/mempool/sender_nonce.go) +// only change is part where signatures are checked + +// this is just for illustration, if we go with similar approach we would use priority nonce mempool, which has same issue but is more complex +// so testing with this one for now + +package main + +import ( + "context" + crand "crypto/rand" // #nosec // crypto/rand is used for seed generation + "encoding/binary" + "fmt" + "math/rand" // #nosec // math/rand is used for random selection and seeded from crypto/rand + + ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/huandu/skiplist" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/mempool" + "github.com/cosmos/cosmos-sdk/x/auth/signing" + evmtypes "github.com/evmos/ethermint/x/evm/types" +) + +var ( + _ mempool.Mempool = (*SenderNonceMempool)(nil) + _ mempool.Iterator = (*senderNonceMempoolIterator)(nil) +) + +var DefaultMaxTx = 0 + +// SenderNonceMempool is a mempool that prioritizes transactions within a sender +// by nonce, the lowest first, but selects a random sender on each iteration. +// The mempool is iterated by: +// +// 1) Maintaining a separate list of nonce ordered txs per sender +// 2) For each select iteration, randomly choose a sender and pick the next nonce ordered tx from their list +// 3) Repeat 1,2 until the mempool is exhausted +// +// Note that PrepareProposal could choose to stop iteration before reaching the +// end if maxBytes is reached. +type SenderNonceMempool struct { + senders map[string]*skiplist.SkipList + rnd *rand.Rand + maxTx int + existingTx map[txKey]bool +} + +type SenderNonceOptions func(mp *SenderNonceMempool) + +type txKey struct { + address string + nonce uint64 +} + +// NewSenderNonceMempool creates a new mempool that prioritizes transactions by +// nonce, the lowest first, picking a random sender on each iteration. +func NewSenderNonceMempool(opts ...SenderNonceOptions) *SenderNonceMempool { + senderMap := make(map[string]*skiplist.SkipList) + existingTx := make(map[txKey]bool) + snp := &SenderNonceMempool{ + senders: senderMap, + maxTx: DefaultMaxTx, + existingTx: existingTx, + } + + var seed int64 + err := binary.Read(crand.Reader, binary.BigEndian, &seed) + if err != nil { + panic(err) + } + + snp.setSeed(seed) + + for _, opt := range opts { + opt(snp) + } + + return snp +} + +// SenderNonceSeedOpt Option To add a Seed for random type when calling the +// constructor NewSenderNonceMempool. +// +// Example: +// +// random_seed := int64(1000) +// NewSenderNonceMempool(SenderNonceSeedTxOpt(random_seed)) +func SenderNonceSeedOpt(seed int64) SenderNonceOptions { + return func(snp *SenderNonceMempool) { + snp.setSeed(seed) + } +} + +// SenderNonceMaxTxOpt Option To set limit of max tx when calling the constructor +// NewSenderNonceMempool. +// +// Example: +// +// NewSenderNonceMempool(SenderNonceMaxTxOpt(100)) +func SenderNonceMaxTxOpt(maxTx int) SenderNonceOptions { + return func(snp *SenderNonceMempool) { + snp.maxTx = maxTx + } +} + +func (snm *SenderNonceMempool) setSeed(seed int64) { + s1 := rand.NewSource(seed) + snm.rnd = rand.New(s1) //#nosec // math/rand is seeded from crypto/rand by default +} + +// NextSenderTx returns the next transaction for a given sender by nonce order, +// i.e. the next valid transaction for the sender. If no such transaction exists, +// nil will be returned. +func (mp *SenderNonceMempool) NextSenderTx(sender string) sdk.Tx { + senderIndex, ok := mp.senders[sender] + if !ok { + return nil + } + + cursor := senderIndex.Front() + return cursor.Value.(sdk.Tx) +} + +// Insert adds a tx to the mempool. It returns an error if the tx does not have +// at least one signer. Note, priority is ignored. +func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error { + if snm.maxTx > 0 && snm.CountTx() >= snm.maxTx { + return mempool.ErrMempoolTxMaxCapacity + } + if snm.maxTx < 0 { + return nil + } + + sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + if err != nil { + return err + } + + msgs := tx.GetMsgs() + unsignedSender := "" + unsignedNonce := uint64(0) + for _, msg := range msgs { + ethMsg, ok := msg.(*evmtypes.MsgEthereumTx) + if !ok { + continue + } else { + ethAddr := ethcommon.HexToAddress(ethMsg.From) + addr := sdk.AccAddress(ethAddr.Bytes()) + unsignedSender = addr.String() + unsignedNonce = ethMsg.AsTransaction().Nonce() + } + } + + var sender string + var nonce uint64 + if unsignedSender != "" { + sender = unsignedSender + nonce = unsignedNonce + } else { + if len(sigs) == 0 { + return fmt.Errorf("tx must have at least one signer") + } + sig := sigs[0] + sender = sdk.AccAddress(sig.PubKey.Address()).String() + nonce = sig.Sequence + } + + senderTxs, found := snm.senders[sender] + if !found { + senderTxs = skiplist.New(skiplist.Uint64) + snm.senders[sender] = senderTxs + } + + senderTxs.Set(nonce, tx) + + key := txKey{nonce: nonce, address: sender} + snm.existingTx[key] = true + + return nil +} + +// Select returns an iterator ordering transactions the mempool with the lowest +// nonce of a random selected sender first. +// +// NOTE: It is not safe to use this iterator while removing transactions from +// the underlying mempool. +func (snm *SenderNonceMempool) Select(_ context.Context, _ [][]byte) mempool.Iterator { + var senders []string + + senderCursors := make(map[string]*skiplist.Element) + orderedSenders := skiplist.New(skiplist.String) + + // #nosec + for s := range snm.senders { + orderedSenders.Set(s, s) + } + + s := orderedSenders.Front() + for s != nil { + sender := s.Value.(string) + senders = append(senders, sender) + senderCursors[sender] = snm.senders[sender].Front() + s = s.Next() + } + + iter := &senderNonceMempoolIterator{ + senders: senders, + rnd: snm.rnd, + senderCursors: senderCursors, + } + + return iter.Next() +} + +// CountTx returns the total count of txs in the mempool. +func (snm *SenderNonceMempool) CountTx() int { + return len(snm.existingTx) +} + +// Remove removes a tx from the mempool. It returns an error if the tx does not +// have at least one signer or the tx was not found in the pool. +func (snm *SenderNonceMempool) Remove(tx sdk.Tx) error { + sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + if err != nil { + return err + } + msgs := tx.GetMsgs() + unsignedSender := "" + unsignedNonce := uint64(0) + for _, msg := range msgs { + ethMsg, ok := msg.(*evmtypes.MsgEthereumTx) + if !ok { + continue + } else { + ethAddr := ethcommon.HexToAddress(ethMsg.From) + addr := sdk.AccAddress(ethAddr.Bytes()) + unsignedSender = addr.String() + unsignedNonce = ethMsg.AsTransaction().Nonce() + } + } + + var sender string + var nonce uint64 + if unsignedSender != "" { + sender = unsignedSender + nonce = unsignedNonce + } else { + if len(sigs) == 0 { + return fmt.Errorf("tx must have at least one signer") + } + sig := sigs[0] + sender = sdk.AccAddress(sig.PubKey.Address()).String() + nonce = sig.Sequence + } + + senderTxs, found := snm.senders[sender] + + if !found { + return mempool.ErrTxNotFound + } + + res := senderTxs.Remove(nonce) + + if res == nil { + return mempool.ErrTxNotFound + } + + if senderTxs.Len() == 0 { + delete(snm.senders, sender) + } + + key := txKey{nonce: nonce, address: sender} + delete(snm.existingTx, key) + + return nil +} + +type senderNonceMempoolIterator struct { + rnd *rand.Rand + currentTx *skiplist.Element + senders []string + senderCursors map[string]*skiplist.Element +} + +// Next returns the next iterator state which will contain a tx with the next +// smallest nonce of a randomly selected sender. +func (i *senderNonceMempoolIterator) Next() mempool.Iterator { + for len(i.senders) > 0 { + senderIndex := i.rnd.Intn(len(i.senders)) + sender := i.senders[senderIndex] + senderCursor, found := i.senderCursors[sender] + if !found { + i.senders = removeAtIndex(i.senders, senderIndex) + continue + } + + if nextCursor := senderCursor.Next(); nextCursor != nil { + i.senderCursors[sender] = nextCursor + } else { + i.senders = removeAtIndex(i.senders, senderIndex) + } + + return &senderNonceMempoolIterator{ + senders: i.senders, + currentTx: senderCursor, + rnd: i.rnd, + senderCursors: i.senderCursors, + } + } + + return nil +} + +func (i *senderNonceMempoolIterator) Tx() sdk.Tx { + return i.currentTx.Value.(sdk.Tx) +} + +func removeAtIndex[T any](slice []T, index int) []T { + return append(slice[:index], slice[index+1:]...) +} diff --git a/cmd/zetacored/root.go b/cmd/zetacored/root.go index d5adb514db..1f14f30aa1 100644 --- a/cmd/zetacored/root.go +++ b/cmd/zetacored/root.go @@ -8,7 +8,6 @@ import ( "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client/snapshot" - "github.com/cosmos/cosmos-sdk/types/mempool" appparams "cosmossdk.io/simapp/params" tmcfg "github.com/cometbft/cometbft/config" @@ -233,7 +232,11 @@ func (ac appCreator) newApp( ) servertypes.Application { baseappOptions := server.DefaultBaseappOptions(appOpts) baseappOptions = append(baseappOptions, func(app *baseapp.BaseApp) { - app.SetMempool(mempool.NoOpMempool{}) + app.SetMempool(NewSenderNonceMempool( + // should be param + SenderNonceMaxTxOpt(cast.ToInt(1000)), + )) + }) skipUpgradeHeights := make(map[int64]bool) for _, h := range cast.ToIntSlice(appOpts.Get(server.FlagUnsafeSkipUpgrades)) { From 13b5c4074d5c7dfd44ce0632090018046e952ee4 Mon Sep 17 00:00:00 2001 From: skosito Date: Thu, 9 May 2024 15:53:25 +0200 Subject: [PATCH 02/11] try out priority nonce mempool with sig workaround --- app/ante/handler_options.go | 1 + app/ante/priority.go | 27 ++ cmd/zetacored/mempool.go | 97 +++---- cmd/zetacored/priority_mempool.go | 442 ++++++++++++++++++++++++++++++ cmd/zetacored/root.go | 10 +- 5 files changed, 513 insertions(+), 64 deletions(-) create mode 100644 app/ante/priority.go create mode 100644 cmd/zetacored/priority_mempool.go diff --git a/app/ante/handler_options.go b/app/ante/handler_options.go index b620f01d9b..9ae24bf56e 100644 --- a/app/ante/handler_options.go +++ b/app/ante/handler_options.go @@ -130,6 +130,7 @@ func newCosmosAnteHandlerForSystemTx(options HandlerOptions) sdk.AnteHandler { ante.NewValidateMemoDecorator(options.AccountKeeper), ante.NewConsumeGasForTxSizeDecorator(options.AccountKeeper), ante.NewDeductFeeDecorator(options.AccountKeeper, options.BankKeeper, options.FeegrantKeeper, options.TxFeeChecker), + NewSystemPriorityDecorator(), // SetPubKeyDecorator must be called before all signature verification decorators ante.NewSetPubKeyDecorator(options.AccountKeeper), ante.NewValidateSigCountDecorator(options.AccountKeeper), diff --git a/app/ante/priority.go b/app/ante/priority.go new file mode 100644 index 0000000000..22c93fa7d0 --- /dev/null +++ b/app/ante/priority.go @@ -0,0 +1,27 @@ +package ante + +import ( + sdk "github.com/cosmos/cosmos-sdk/types" +) + +var _ sdk.AnteDecorator = SystemPriorityDecorator{} + +// SystemPriorityDecorator adds bigger priority for system messages +type SystemPriorityDecorator struct { +} + +// NewSystemPriorityDecorator creates a decorator to add bigger priority for system messages +func NewSystemPriorityDecorator() SystemPriorityDecorator { + return SystemPriorityDecorator{} +} + +// AnteHandle implements AnteDecorator +func (vad SystemPriorityDecorator) AnteHandle( + ctx sdk.Context, + tx sdk.Tx, + simulate bool, + next sdk.AnteHandler, +) (sdk.Context, error) { + newCtx := ctx.WithPriority(500000000) // arbirtrary value, to be revisited, maybe relative to current context.Priority (eg. double it) + return next(newCtx, tx, simulate) +} diff --git a/cmd/zetacored/mempool.go b/cmd/zetacored/mempool.go index c3d923612d..508ec22515 100644 --- a/cmd/zetacored/mempool.go +++ b/cmd/zetacored/mempool.go @@ -18,6 +18,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/mempool" + authante "github.com/cosmos/cosmos-sdk/x/auth/ante" "github.com/cosmos/cosmos-sdk/x/auth/signing" evmtypes "github.com/evmos/ethermint/x/evm/types" ) @@ -132,40 +133,11 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error { return nil } - sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + sender, nonce, err := getSenderAndNonce(tx) if err != nil { return err } - msgs := tx.GetMsgs() - unsignedSender := "" - unsignedNonce := uint64(0) - for _, msg := range msgs { - ethMsg, ok := msg.(*evmtypes.MsgEthereumTx) - if !ok { - continue - } else { - ethAddr := ethcommon.HexToAddress(ethMsg.From) - addr := sdk.AccAddress(ethAddr.Bytes()) - unsignedSender = addr.String() - unsignedNonce = ethMsg.AsTransaction().Nonce() - } - } - - var sender string - var nonce uint64 - if unsignedSender != "" { - sender = unsignedSender - nonce = unsignedNonce - } else { - if len(sigs) == 0 { - return fmt.Errorf("tx must have at least one signer") - } - sig := sigs[0] - sender = sdk.AccAddress(sig.PubKey.Address()).String() - nonce = sig.Sequence - } - senderTxs, found := snm.senders[sender] if !found { senderTxs = skiplist.New(skiplist.Uint64) @@ -221,38 +193,10 @@ func (snm *SenderNonceMempool) CountTx() int { // Remove removes a tx from the mempool. It returns an error if the tx does not // have at least one signer or the tx was not found in the pool. func (snm *SenderNonceMempool) Remove(tx sdk.Tx) error { - sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + sender, nonce, err := getSenderAndNonce(tx) if err != nil { return err } - msgs := tx.GetMsgs() - unsignedSender := "" - unsignedNonce := uint64(0) - for _, msg := range msgs { - ethMsg, ok := msg.(*evmtypes.MsgEthereumTx) - if !ok { - continue - } else { - ethAddr := ethcommon.HexToAddress(ethMsg.From) - addr := sdk.AccAddress(ethAddr.Bytes()) - unsignedSender = addr.String() - unsignedNonce = ethMsg.AsTransaction().Nonce() - } - } - - var sender string - var nonce uint64 - if unsignedSender != "" { - sender = unsignedSender - nonce = unsignedNonce - } else { - if len(sigs) == 0 { - return fmt.Errorf("tx must have at least one signer") - } - sig := sigs[0] - sender = sdk.AccAddress(sig.PubKey.Address()).String() - nonce = sig.Sequence - } senderTxs, found := snm.senders[sender] @@ -319,3 +263,38 @@ func (i *senderNonceMempoolIterator) Tx() sdk.Tx { func removeAtIndex[T any](slice []T, index int) []T { return append(slice[:index], slice[index+1:]...) } + +func getSenderAndNonce(tx sdk.Tx) (string, uint64, error) { + if txWithExtensions, ok := tx.(authante.HasExtensionOptionsTx); ok { + opts := txWithExtensions.GetExtensionOptions() + if len(opts) > 0 && opts[0].GetTypeUrl() == "/ethermint.evm.v1.ExtensionOptionsEthereumTx" { + for _, msg := range tx.GetMsgs() { + if ethMsg, ok := msg.(*evmtypes.MsgEthereumTx); ok { + ethAddr := ethcommon.HexToAddress(ethMsg.From) + addr := sdk.AccAddress(ethAddr.Bytes()) + return addr.String(), ethMsg.AsTransaction().Nonce(), nil + } + } + } + } + + return getSenderAndNonceDefault(tx) +} + +func getSenderAndNonceDefault(tx sdk.Tx) (string, uint64, error) { + sigTx, ok := tx.(signing.SigVerifiableTx) + if !ok { + return "", 0, fmt.Errorf("tx of type %T does not implement SigVerifiableTx", tx) + } + + sigs, err := sigTx.GetSignaturesV2() + if err != nil { + return "", 0, err + } + + if len(sigs) == 0 { + return "", 0, fmt.Errorf("tx must have at least one signer") + } + + return sigs[0].PubKey.Address().String(), sigs[0].Sequence, nil +} diff --git a/cmd/zetacored/priority_mempool.go b/cmd/zetacored/priority_mempool.go new file mode 100644 index 0000000000..b9fd582d60 --- /dev/null +++ b/cmd/zetacored/priority_mempool.go @@ -0,0 +1,442 @@ +// This is fork of SenderNonceMempool from cosmos sdk 0.47 (check: https://github.com/cosmos/cosmos-sdk/blob/v0.47.10/types/mempool/priority_nonce.go) +// only change is part where signatures are checked + +// this is just for illustration, if we go with similar approach we would use priority nonce mempool, which has same issue but is more complex +// so testing with this one for now + +package main + +import ( + "context" + "fmt" + "math" + + "github.com/huandu/skiplist" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/mempool" +) + +var ( + _ mempool.Mempool = (*PriorityNonceMempool)(nil) + _ mempool.Iterator = (*PriorityNonceIterator)(nil) +) + +// PriorityNonceMempool is a mempool implementation that stores txs +// in a partially ordered set by 2 dimensions: priority, and sender-nonce +// (sequence number). Internally it uses one priority ordered skip list and one +// skip list per sender ordered by sender-nonce (sequence number). When there +// are multiple txs from the same sender, they are not always comparable by +// priority to other sender txs and must be partially ordered by both sender-nonce +// and priority. +type PriorityNonceMempool struct { + priorityIndex *skiplist.SkipList + priorityCounts map[int64]int + senderIndices map[string]*skiplist.SkipList + scores map[txMeta]txMeta + onRead func(tx sdk.Tx) + txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool + maxTx int +} + +type PriorityNonceIterator struct { + senderCursors map[string]*skiplist.Element + nextPriority int64 + sender string + priorityNode *skiplist.Element + mempool *PriorityNonceMempool +} + +// txMeta stores transaction metadata used in indices +type txMeta struct { + // nonce is the sender's sequence number + nonce uint64 + // priority is the transaction's priority + priority int64 + // sender is the transaction's sender + sender string + // weight is the transaction's weight, used as a tiebreaker for transactions with the same priority + weight int64 + // senderElement is a pointer to the transaction's element in the sender index + senderElement *skiplist.Element +} + +// txMetaLess is a comparator for txKeys that first compares priority, then weight, +// then sender, then nonce, uniquely identifying a transaction. +// +// Note, txMetaLess is used as the comparator in the priority index. +func txMetaLess(a, b any) int { + keyA := a.(txMeta) + keyB := b.(txMeta) + res := skiplist.Int64.Compare(keyA.priority, keyB.priority) + if res != 0 { + return res + } + + // Weight is used as a tiebreaker for transactions with the same priority. + // Weight is calculated in a single pass in .Select(...) and so will be 0 + // on .Insert(...). + res = skiplist.Int64.Compare(keyA.weight, keyB.weight) + if res != 0 { + return res + } + + // Because weight will be 0 on .Insert(...), we must also compare sender and + // nonce to resolve priority collisions. If we didn't then transactions with + // the same priority would overwrite each other in the priority index. + res = skiplist.String.Compare(keyA.sender, keyB.sender) + if res != 0 { + return res + } + + return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce) +} + +type PriorityNonceMempoolOption func(*PriorityNonceMempool) + +// PriorityNonceWithOnRead sets a callback to be called when a tx is read from +// the mempool. +func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption { + return func(mp *PriorityNonceMempool) { + mp.onRead = onRead + } +} + +// PriorityNonceWithTxReplacement sets a callback to be called when duplicated +// transaction nonce detected during mempool insert. An application can define a +// transaction replacement rule based on tx priority or certain transaction fields. +func PriorityNonceWithTxReplacement(txReplacementRule func(op, np int64, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption { + return func(mp *PriorityNonceMempool) { + mp.txReplacement = txReplacementRule + } +} + +// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the +// mempool with the semantics: +// +// <0: disabled, `Insert` is a no-op +// 0: unlimited +// >0: maximum number of transactions allowed +func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption { + return func(mp *PriorityNonceMempool) { + mp.maxTx = maxTx + } +} + +// DefaultPriorityMempool returns a priorityNonceMempool with no options. +func DefaultPriorityMempool() mempool.Mempool { + return NewPriorityMempool() +} + +// NewPriorityMempool returns the SDK's default mempool implementation which +// returns txs in a partial order by 2 dimensions; priority, and sender-nonce. +func NewPriorityMempool(opts ...PriorityNonceMempoolOption) *PriorityNonceMempool { + mp := &PriorityNonceMempool{ + priorityIndex: skiplist.New(skiplist.LessThanFunc(txMetaLess)), + priorityCounts: make(map[int64]int), + senderIndices: make(map[string]*skiplist.SkipList), + scores: make(map[txMeta]txMeta), + } + + for _, opt := range opts { + opt(mp) + } + + return mp +} + +// NextSenderTx returns the next transaction for a given sender by nonce order, +// i.e. the next valid transaction for the sender. If no such transaction exists, +// nil will be returned. +func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx { + senderIndex, ok := mp.senderIndices[sender] + if !ok { + return nil + } + + cursor := senderIndex.Front() + return cursor.Value.(sdk.Tx) +} + +// Insert attempts to insert a Tx into the app-side mempool in O(log n) time, +// returning an error if unsuccessful. Sender and nonce are derived from the +// transaction's first signature. +// +// Transactions are unique by sender and nonce. Inserting a duplicate tx is an +// O(log n) no-op. +// +// Inserting a duplicate tx with a different priority overwrites the existing tx, +// changing the total order of the mempool. +func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { + if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx { + return mempool.ErrMempoolTxMaxCapacity + } else if mp.maxTx < 0 { + return nil + } + + sender, nonce, err := getSenderAndNonce(tx) + if err != nil { + return err + } + + sdkContext := sdk.UnwrapSDKContext(ctx) + priority := sdkContext.Priority() + fmt.Println("insert priority ", priority) + key := txMeta{nonce: nonce, priority: priority, sender: sender} + + senderIndex, ok := mp.senderIndices[sender] + if !ok { + senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int { + return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce) + })) + + // initialize sender index if not found + mp.senderIndices[sender] = senderIndex + } + + // Since mp.priorityIndex is scored by priority, then sender, then nonce, a + // changed priority will create a new key, so we must remove the old key and + // re-insert it to avoid having the same tx with different priorityIndex indexed + // twice in the mempool. + // + // This O(log n) remove operation is rare and only happens when a tx's priority + // changes. + sk := txMeta{nonce: nonce, sender: sender} + if oldScore, txExists := mp.scores[sk]; txExists { + if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) { + return fmt.Errorf( + "tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v", + oldScore.priority, + priority, + senderIndex.Get(key).Value.(sdk.Tx), + tx, + ) + } + + mp.priorityIndex.Remove(txMeta{ + nonce: nonce, + sender: sender, + priority: oldScore.priority, + weight: oldScore.weight, + }) + mp.priorityCounts[oldScore.priority]-- + } + + mp.priorityCounts[priority]++ + + // Since senderIndex is scored by nonce, a changed priority will overwrite the + // existing key. + key.senderElement = senderIndex.Set(key, tx) + + mp.scores[sk] = txMeta{priority: priority} + mp.priorityIndex.Set(key, tx) + + return nil +} + +func (i *PriorityNonceIterator) iteratePriority() mempool.Iterator { + // beginning of priority iteration + if i.priorityNode == nil { + i.priorityNode = i.mempool.priorityIndex.Front() + } else { + i.priorityNode = i.priorityNode.Next() + } + + // end of priority iteration + if i.priorityNode == nil { + return nil + } + + i.sender = i.priorityNode.Key().(txMeta).sender + + nextPriorityNode := i.priorityNode.Next() + if nextPriorityNode != nil { + i.nextPriority = nextPriorityNode.Key().(txMeta).priority + } else { + i.nextPriority = math.MinInt64 + } + + return i.Next() +} + +func (i *PriorityNonceIterator) Next() mempool.Iterator { + if i.priorityNode == nil { + return nil + } + + cursor, ok := i.senderCursors[i.sender] + if !ok { + // beginning of sender iteration + cursor = i.mempool.senderIndices[i.sender].Front() + } else { + // middle of sender iteration + cursor = cursor.Next() + } + + // end of sender iteration + if cursor == nil { + return i.iteratePriority() + } + + key := cursor.Key().(txMeta) + + // We've reached a transaction with a priority lower than the next highest + // priority in the pool. + if key.priority < i.nextPriority { + return i.iteratePriority() + } else if key.priority == i.nextPriority && i.priorityNode.Next() != nil { + // Weight is incorporated into the priority index key only (not sender index) + // so we must fetch it here from the scores map. + weight := i.mempool.scores[txMeta{nonce: key.nonce, sender: key.sender}].weight + if weight < i.priorityNode.Next().Key().(txMeta).weight { + return i.iteratePriority() + } + } + + i.senderCursors[i.sender] = cursor + return i +} + +func (i *PriorityNonceIterator) Tx() sdk.Tx { + return i.senderCursors[i.sender].Value.(sdk.Tx) +} + +// Select returns a set of transactions from the mempool, ordered by priority +// and sender-nonce in O(n) time. The passed in list of transactions are ignored. +// This is a readonly operation, the mempool is not modified. +// +// The maxBytes parameter defines the maximum number of bytes of transactions to +// return. +// +// NOTE: It is not safe to use this iterator while removing transactions from +// the underlying mempool. +func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) mempool.Iterator { + if mp.priorityIndex.Len() == 0 { + return nil + } + + mp.reorderPriorityTies() + + iterator := &PriorityNonceIterator{ + mempool: mp, + senderCursors: make(map[string]*skiplist.Element), + } + + return iterator.iteratePriority() +} + +type reorderKey struct { + deleteKey txMeta + insertKey txMeta + tx sdk.Tx +} + +func (mp *PriorityNonceMempool) reorderPriorityTies() { + node := mp.priorityIndex.Front() + + var reordering []reorderKey + for node != nil { + key := node.Key().(txMeta) + if mp.priorityCounts[key.priority] > 1 { + newKey := key + newKey.weight = senderWeight(key.senderElement) + reordering = append(reordering, reorderKey{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)}) + } + + node = node.Next() + } + + for _, k := range reordering { + mp.priorityIndex.Remove(k.deleteKey) + delete(mp.scores, txMeta{nonce: k.deleteKey.nonce, sender: k.deleteKey.sender}) + mp.priorityIndex.Set(k.insertKey, k.tx) + mp.scores[txMeta{nonce: k.insertKey.nonce, sender: k.insertKey.sender}] = k.insertKey + } +} + +// senderWeight returns the weight of a given tx (t) at senderCursor. Weight is +// defined as the first (nonce-wise) same sender tx with a priority not equal to +// t. It is used to resolve priority collisions, that is when 2 or more txs from +// different senders have the same priority. +func senderWeight(senderCursor *skiplist.Element) int64 { + if senderCursor == nil { + return 0 + } + + weight := senderCursor.Key().(txMeta).priority + senderCursor = senderCursor.Next() + for senderCursor != nil { + p := senderCursor.Key().(txMeta).priority + if p != weight { + weight = p + } + + senderCursor = senderCursor.Next() + } + + return weight +} + +// CountTx returns the number of transactions in the mempool. +func (mp *PriorityNonceMempool) CountTx() int { + return mp.priorityIndex.Len() +} + +// Remove removes a transaction from the mempool in O(log n) time, returning an +// error if unsuccessful. +func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error { + sender, nonce, err := getSenderAndNonce(tx) + if err != nil { + return err + } + + scoreKey := txMeta{nonce: nonce, sender: sender} + score, ok := mp.scores[scoreKey] + if !ok { + return mempool.ErrTxNotFound + } + tk := txMeta{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight} + + senderTxs, ok := mp.senderIndices[sender] + if !ok { + return fmt.Errorf("sender %s not found", sender) + } + + mp.priorityIndex.Remove(tk) + senderTxs.Remove(tk) + delete(mp.scores, scoreKey) + mp.priorityCounts[score.priority]-- + + return nil +} + +func IsEmpty(mempool mempool.Mempool) error { + mp := mempool.(*PriorityNonceMempool) + if mp.priorityIndex.Len() != 0 { + return fmt.Errorf("priorityIndex not empty") + } + + var countKeys []int64 + for k := range mp.priorityCounts { + countKeys = append(countKeys, k) + } + + for _, k := range countKeys { + if mp.priorityCounts[k] != 0 { + return fmt.Errorf("priorityCounts not zero at %v, got %v", k, mp.priorityCounts[k]) + } + } + + var senderKeys []string + for k := range mp.senderIndices { + senderKeys = append(senderKeys, k) + } + + for _, k := range senderKeys { + if mp.senderIndices[k].Len() != 0 { + return fmt.Errorf("senderIndex not empty for sender %v", k) + } + } + + return nil +} diff --git a/cmd/zetacored/root.go b/cmd/zetacored/root.go index 1f14f30aa1..233d8b434a 100644 --- a/cmd/zetacored/root.go +++ b/cmd/zetacored/root.go @@ -232,11 +232,11 @@ func (ac appCreator) newApp( ) servertypes.Application { baseappOptions := server.DefaultBaseappOptions(appOpts) baseappOptions = append(baseappOptions, func(app *baseapp.BaseApp) { - app.SetMempool(NewSenderNonceMempool( - // should be param - SenderNonceMaxTxOpt(cast.ToInt(1000)), - )) - + // app.SetMempool(NewSenderNonceMempool( + // // should be param + // SenderNonceMaxTxOpt(cast.ToInt(1000)), + // )) + app.SetMempool(DefaultPriorityMempool()) }) skipUpgradeHeights := make(map[int64]bool) for _, h := range cast.ToIntSlice(appOpts.Get(server.FlagUnsafeSkipUpgrades)) { From 99603268f1a23ca179c6e148f44b5fbb18c6694e Mon Sep 17 00:00:00 2001 From: skosito Date: Thu, 9 May 2024 17:41:53 +0200 Subject: [PATCH 03/11] add custom prepare proposal handler --- app/app.go | 4 +- app/custom_proposal_handler.go | 367 ++++++++++++++++++ ...y_mempool.go => priority_nonce_mempool.go} | 12 +- .../{mempool.go => sender_nonce_mempool.go} | 49 ++- 4 files changed, 415 insertions(+), 17 deletions(-) create mode 100644 app/custom_proposal_handler.go rename cmd/zetacored/{priority_mempool.go => priority_nonce_mempool.go} (97%) rename cmd/zetacored/{mempool.go => sender_nonce_mempool.go} (87%) diff --git a/app/app.go b/app/app.go index 1fc5accfb9..2f9dd379ca 100644 --- a/app/app.go +++ b/app/app.go @@ -343,8 +343,8 @@ func New( bApp.SetParamStore(&app.ConsensusParamsKeeper) // DefaultProposalHandler also checks for sigs, probably will need custom - // customProposalHandler := NewCustomProposalHandler(bApp.Mempool(), bApp) - // app.SetPrepareProposal(customProposalHandler.PrepareProposalHandler()) + customProposalHandler := NewCustomProposalHandler(bApp.Mempool(), bApp) + app.SetPrepareProposal(customProposalHandler.PrepareProposalHandler()) // add keepers // use custom Ethermint account for contracts diff --git a/app/custom_proposal_handler.go b/app/custom_proposal_handler.go new file mode 100644 index 0000000000..6d02fdf816 --- /dev/null +++ b/app/custom_proposal_handler.go @@ -0,0 +1,367 @@ +package app + +import ( + "fmt" + + "github.com/cockroachdb/errors" + abci "github.com/cometbft/cometbft/abci/types" + evmtypes "github.com/evmos/ethermint/x/evm/types" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/mempool" + authante "github.com/cosmos/cosmos-sdk/x/auth/ante" + "github.com/cosmos/cosmos-sdk/x/auth/signing" +) + +type ( + // GasTx defines the contract that a transaction with a gas limit must implement. + GasTx interface { + GetGas() uint64 + } + + // ProposalTxVerifier defines the interface that is implemented by BaseApp, + // that any custom ABCI PrepareProposal and ProcessProposal handler can use + // to verify a transaction. + ProposalTxVerifier interface { + PrepareProposalVerifyTx(tx sdk.Tx) ([]byte, error) + ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error) + } + + // CustomProposalHandler defines the default ABCI PrepareProposal and + // ProcessProposal handlers. + CustomProposalHandler struct { + mempool mempool.Mempool + txVerifier ProposalTxVerifier + txSelector TxSelector + } +) + +func NewCustomProposalHandler(mp mempool.Mempool, txVerifier ProposalTxVerifier) *CustomProposalHandler { + return &CustomProposalHandler{ + mempool: mp, + txVerifier: txVerifier, + txSelector: NewDefaultTxSelector(), + } +} + +// SetTxSelector sets the TxSelector function on the CustomProposalHandler. +func (h *CustomProposalHandler) SetTxSelector(ts TxSelector) { + h.txSelector = ts +} + +// PrepareProposalHandler returns the default implementation for processing an +// ABCI proposal. The application's mempool is enumerated and all valid +// transactions are added to the proposal. Transactions are valid if they: +// +// 1) Successfully encode to bytes. +// 2) Are valid (i.e. pass runTx, AnteHandler only). +// +// Enumeration is halted once RequestPrepareProposal.MaxBytes of transactions is +// reached or the mempool is exhausted. +// +// Note: +// +// - Step (2) is identical to the validation step performed in +// DefaultProcessProposal. It is very important that the same validation logic +// is used in both steps, and applications must ensure that this is the case in +// non-default handlers. +// +// - If no mempool is set or if the mempool is a no-op mempool, the transactions +// requested from CometBFT will simply be returned, which, by default, are in +// FIFO order. +func (h *CustomProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { + return func(ctx sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { + var maxBlockGas uint64 + if b := ctx.ConsensusParams().Block; b != nil { + maxBlockGas = uint64(b.MaxGas) + } + + defer h.txSelector.Clear() + + // If the mempool is nil or NoOp we simply return the transactions + // requested from CometBFT, which, by default, should be in FIFO order. + // + // Note, we still need to ensure the transactions returned respect req.MaxTxBytes. + _, isNoOp := h.mempool.(mempool.NoOpMempool) + if h.mempool == nil || isNoOp { + for _, txBz := range req.Txs { + // XXX: We pass nil as the memTx because we have no way of decoding the + // txBz. We'd need to break (update) the ProposalTxVerifier interface. + // As a result, we CANNOT account for block max gas. + stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, nil, txBz) + if stop { + break + } + } + + return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()} + } + + iterator := h.mempool.Select(ctx, req.Txs) + selectedTxsSignersSeqs := make(map[string]uint64) + var selectedTxsNums int + for iterator != nil { + memTx := iterator.Tx() + + sendersWithNonce, err := GetSendersWithNonce(memTx) + if err != nil { + panic(fmt.Errorf("failed to get signatures: %w", err)) + } + + // If the signers aren't in selectedTxsSignersSeqs then we haven't seen them before + // so we add them and continue given that we don't need to check the sequence. + shouldAdd := true + txSignersSeqs := make(map[string]uint64) + for _, sig := range sendersWithNonce { + fmt.Println("prepare proposal ", sig.Sender, sig.Nonce) + signer := sig.Sender + nonce := sig.Nonce + seq, ok := selectedTxsSignersSeqs[signer] + if !ok { + txSignersSeqs[signer] = nonce + continue + } + + // If we have seen this signer before in this block, we must make + // sure that the current sequence is seq+1; otherwise is invalid + // and we skip it. + if seq+1 != nonce { + shouldAdd = false + break + } + txSignersSeqs[signer] = nonce + } + if !shouldAdd { + iterator = iterator.Next() + continue + } + + // NOTE: Since transaction verification was already executed in CheckTx, + // which calls mempool.Insert, in theory everything in the pool should be + // valid. But some mempool implementations may insert invalid txs, so we + // check again. + txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx) + if err != nil { + err := h.mempool.Remove(memTx) + if err != nil && !errors.Is(err, mempool.ErrTxNotFound) { + panic(err) + } + } else { + stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz) + if stop { + break + } + + txsLen := len(h.txSelector.SelectedTxs()) + for sender, seq := range txSignersSeqs { + // If txsLen != selectedTxsNums is true, it means that we've + // added a new tx to the selected txs, so we need to update + // the sequence of the sender. + if txsLen != selectedTxsNums { + selectedTxsSignersSeqs[sender] = seq + } else if _, ok := selectedTxsSignersSeqs[sender]; !ok { + // The transaction hasn't been added but it passed the + // verification, so we know that the sequence is correct. + // So we set this sender's sequence to seq-1, in order + // to avoid unnecessary calls to PrepareProposalVerifyTx. + selectedTxsSignersSeqs[sender] = seq - 1 + } + } + selectedTxsNums = txsLen + } + + iterator = iterator.Next() + } + + return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()} + } +} + +// ProcessProposalHandler returns the default implementation for processing an +// ABCI proposal. Every transaction in the proposal must pass 2 conditions: +// +// 1. The transaction bytes must decode to a valid transaction. +// 2. The transaction must be valid (i.e. pass runTx, AnteHandler only) +// +// If any transaction fails to pass either condition, the proposal is rejected. +// Note that step (2) is identical to the validation step performed in +// DefaultPrepareProposal. It is very important that the same validation logic +// is used in both steps, and applications must ensure that this is the case in +// non-default handlers. +func (h *CustomProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler { + // If the mempool is nil or NoOp we simply return ACCEPT, + // because PrepareProposal may have included txs that could fail verification. + _, isNoOp := h.mempool.(mempool.NoOpMempool) + if h.mempool == nil || isNoOp { + return NoOpProcessProposal() + } + + return func(ctx sdk.Context, req abci.RequestProcessProposal) abci.ResponseProcessProposal { + var totalTxGas uint64 + + var maxBlockGas int64 + if b := ctx.ConsensusParams().Block; b != nil { + maxBlockGas = b.MaxGas + } + + for _, txBytes := range req.Txs { + tx, err := h.txVerifier.ProcessProposalVerifyTx(txBytes) + if err != nil { + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} + } + + if maxBlockGas > 0 { + gasTx, ok := tx.(GasTx) + if ok { + totalTxGas += gasTx.GetGas() + } + + if totalTxGas > uint64(maxBlockGas) { + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} + } + } + } + + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT} + } +} + +// NoOpPrepareProposal defines a no-op PrepareProposal handler. It will always +// return the transactions sent by the client's request. +func NoOpPrepareProposal() sdk.PrepareProposalHandler { + return func(_ sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { + return abci.ResponsePrepareProposal{Txs: req.Txs} + } +} + +// NoOpProcessProposal defines a no-op ProcessProposal Handler. It will always +// return ACCEPT. +func NoOpProcessProposal() sdk.ProcessProposalHandler { + return func(_ sdk.Context, _ abci.RequestProcessProposal) abci.ResponseProcessProposal { + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT} + } +} + +// TxSelector defines a helper type that assists in selecting transactions during +// mempool transaction selection in PrepareProposal. It keeps track of the total +// number of bytes and total gas of the selected transactions. It also keeps +// track of the selected transactions themselves. +type TxSelector interface { + // SelectedTxs should return a copy of the selected transactions. + SelectedTxs() [][]byte + + // Clear should clear the TxSelector, nulling out all relevant fields. + Clear() + + // SelectTxForProposal should attempt to select a transaction for inclusion in + // a proposal based on inclusion criteria defined by the TxSelector. It must + // return if the caller should halt the transaction selection loop + // (typically over a mempool) or otherwise. + SelectTxForProposal(maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool +} + +type defaultTxSelector struct { + totalTxBytes uint64 + totalTxGas uint64 + selectedTxs [][]byte +} + +func NewDefaultTxSelector() TxSelector { + return &defaultTxSelector{} +} + +func (ts *defaultTxSelector) SelectedTxs() [][]byte { + txs := make([][]byte, len(ts.selectedTxs)) + copy(txs, ts.selectedTxs) + return txs +} + +func (ts *defaultTxSelector) Clear() { + ts.totalTxBytes = 0 + ts.totalTxGas = 0 + ts.selectedTxs = nil +} + +func (ts *defaultTxSelector) SelectTxForProposal(maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool { + txSize := uint64(len(txBz)) + + var txGasLimit uint64 + if memTx != nil { + if gasTx, ok := memTx.(GasTx); ok { + txGasLimit = gasTx.GetGas() + } + } + + // only add the transaction to the proposal if we have enough capacity + if (txSize + ts.totalTxBytes) <= maxTxBytes { + // If there is a max block gas limit, add the tx only if the limit has + // not been met. + if maxBlockGas > 0 { + if (txGasLimit + ts.totalTxGas) <= maxBlockGas { + ts.totalTxGas += txGasLimit + ts.totalTxBytes += txSize + ts.selectedTxs = append(ts.selectedTxs, txBz) + } + } else { + ts.totalTxBytes += txSize + ts.selectedTxs = append(ts.selectedTxs, txBz) + } + } + + // check if we've reached capacity; if so, we cannot select any more transactions + return ts.totalTxBytes >= maxTxBytes || (maxBlockGas > 0 && (ts.totalTxGas >= maxBlockGas)) +} + +// TODO: move to common place and use in mempool too +func GetSendersWithNonce(tx sdk.Tx) ([]SenderWithNonce, error) { + if txWithExtensions, ok := tx.(authante.HasExtensionOptionsTx); ok { + opts := txWithExtensions.GetExtensionOptions() + if len(opts) > 0 && opts[0].GetTypeUrl() == "/ethermint.evm.v1.ExtensionOptionsEthereumTx" { + for _, msg := range tx.GetMsgs() { + if ethMsg, ok := msg.(*evmtypes.MsgEthereumTx); ok { + + return []SenderWithNonce{ + { + Sender: ethMsg.GetFrom().String(), + Nonce: ethMsg.AsTransaction().Nonce(), + }, + }, nil + } + } + } + } + + return getSendersWithNonceDefault(tx) +} + +type SenderWithNonce struct { + Sender string + Nonce uint64 +} + +func getSendersWithNonceDefault(tx sdk.Tx) ([]SenderWithNonce, error) { + sendersWithNonce := []SenderWithNonce{} + + sigTx, ok := tx.(signing.SigVerifiableTx) + if !ok { + return nil, fmt.Errorf("tx of type %T does not implement SigVerifiableTx", tx) + } + + sigs, err := sigTx.GetSignaturesV2() + if err != nil { + return nil, err + } + + if len(sigs) == 0 { + return nil, fmt.Errorf("tx must have at least one signer") + } + + for _, sig := range sigs { + sendersWithNonce = append(sendersWithNonce, SenderWithNonce{ + Sender: sig.PubKey.Address().String(), + Nonce: sig.Sequence, + }) + } + + return sendersWithNonce, nil +} diff --git a/cmd/zetacored/priority_mempool.go b/cmd/zetacored/priority_nonce_mempool.go similarity index 97% rename from cmd/zetacored/priority_mempool.go rename to cmd/zetacored/priority_nonce_mempool.go index b9fd582d60..8b86026ea9 100644 --- a/cmd/zetacored/priority_mempool.go +++ b/cmd/zetacored/priority_nonce_mempool.go @@ -174,11 +174,16 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { return nil } - sender, nonce, err := getSenderAndNonce(tx) + sendersWithNonce, err := GetSendersWithNonce(tx) if err != nil { return err } + sender := sendersWithNonce[0].Sender + nonce := sendersWithNonce[0].Nonce + + fmt.Println("insert tx ", sender, nonce) + sdkContext := sdk.UnwrapSDKContext(ctx) priority := sdkContext.Priority() fmt.Println("insert priority ", priority) @@ -385,11 +390,14 @@ func (mp *PriorityNonceMempool) CountTx() int { // Remove removes a transaction from the mempool in O(log n) time, returning an // error if unsuccessful. func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error { - sender, nonce, err := getSenderAndNonce(tx) + sendersWithNonce, err := GetSendersWithNonce(tx) if err != nil { return err } + sender := sendersWithNonce[0].Sender + nonce := sendersWithNonce[0].Nonce + scoreKey := txMeta{nonce: nonce, sender: sender} score, ok := mp.scores[scoreKey] if !ok { diff --git a/cmd/zetacored/mempool.go b/cmd/zetacored/sender_nonce_mempool.go similarity index 87% rename from cmd/zetacored/mempool.go rename to cmd/zetacored/sender_nonce_mempool.go index 508ec22515..91dd32aba5 100644 --- a/cmd/zetacored/mempool.go +++ b/cmd/zetacored/sender_nonce_mempool.go @@ -13,7 +13,6 @@ import ( "fmt" "math/rand" // #nosec // math/rand is used for random selection and seeded from crypto/rand - ethcommon "github.com/ethereum/go-ethereum/common" "github.com/huandu/skiplist" sdk "github.com/cosmos/cosmos-sdk/types" @@ -133,11 +132,14 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error { return nil } - sender, nonce, err := getSenderAndNonce(tx) + sendersWithNonce, err := GetSendersWithNonce(tx) if err != nil { return err } + sender := sendersWithNonce[0].Sender + nonce := sendersWithNonce[0].Nonce + senderTxs, found := snm.senders[sender] if !found { senderTxs = skiplist.New(skiplist.Uint64) @@ -193,11 +195,14 @@ func (snm *SenderNonceMempool) CountTx() int { // Remove removes a tx from the mempool. It returns an error if the tx does not // have at least one signer or the tx was not found in the pool. func (snm *SenderNonceMempool) Remove(tx sdk.Tx) error { - sender, nonce, err := getSenderAndNonce(tx) + sendersWithNonce, err := GetSendersWithNonce(tx) if err != nil { return err } + sender := sendersWithNonce[0].Sender + nonce := sendersWithNonce[0].Nonce + senderTxs, found := snm.senders[sender] if !found { @@ -264,37 +269,55 @@ func removeAtIndex[T any](slice []T, index int) []T { return append(slice[:index], slice[index+1:]...) } -func getSenderAndNonce(tx sdk.Tx) (string, uint64, error) { +func GetSendersWithNonce(tx sdk.Tx) ([]SenderWithNonce, error) { if txWithExtensions, ok := tx.(authante.HasExtensionOptionsTx); ok { opts := txWithExtensions.GetExtensionOptions() if len(opts) > 0 && opts[0].GetTypeUrl() == "/ethermint.evm.v1.ExtensionOptionsEthereumTx" { for _, msg := range tx.GetMsgs() { if ethMsg, ok := msg.(*evmtypes.MsgEthereumTx); ok { - ethAddr := ethcommon.HexToAddress(ethMsg.From) - addr := sdk.AccAddress(ethAddr.Bytes()) - return addr.String(), ethMsg.AsTransaction().Nonce(), nil + + return []SenderWithNonce{ + { + Sender: ethMsg.GetFrom().String(), + Nonce: ethMsg.AsTransaction().Nonce(), + }, + }, nil } } } } - return getSenderAndNonceDefault(tx) + return getSendersWithNonceDefault(tx) +} + +type SenderWithNonce struct { + Sender string + Nonce uint64 } -func getSenderAndNonceDefault(tx sdk.Tx) (string, uint64, error) { +func getSendersWithNonceDefault(tx sdk.Tx) ([]SenderWithNonce, error) { + sendersWithNonce := []SenderWithNonce{} + sigTx, ok := tx.(signing.SigVerifiableTx) if !ok { - return "", 0, fmt.Errorf("tx of type %T does not implement SigVerifiableTx", tx) + return nil, fmt.Errorf("tx of type %T does not implement SigVerifiableTx", tx) } sigs, err := sigTx.GetSignaturesV2() if err != nil { - return "", 0, err + return nil, err } if len(sigs) == 0 { - return "", 0, fmt.Errorf("tx must have at least one signer") + return nil, fmt.Errorf("tx must have at least one signer") + } + + for _, sig := range sigs { + sendersWithNonce = append(sendersWithNonce, SenderWithNonce{ + Sender: sig.PubKey.Address().String(), + Nonce: sig.Sequence, + }) } - return sigs[0].PubKey.Address().String(), sigs[0].Sequence, nil + return sendersWithNonce, nil } From 6413c7ee724e393c11b435e9951a2e664757675a Mon Sep 17 00:00:00 2001 From: skosito Date: Thu, 9 May 2024 21:06:41 +0200 Subject: [PATCH 04/11] cleanup --- app/ante/priority.go | 4 +- app/custom_proposal_handler.go | 60 +--- .../mempool}/priority_nonce_mempool.go | 7 +- app/mempool/senders_with_nonce.go | 63 ++++ cmd/zetacored/root.go | 7 +- cmd/zetacored/sender_nonce_mempool.go | 323 ------------------ 6 files changed, 72 insertions(+), 392 deletions(-) rename {cmd/zetacored => app/mempool}/priority_nonce_mempool.go (97%) create mode 100644 app/mempool/senders_with_nonce.go delete mode 100644 cmd/zetacored/sender_nonce_mempool.go diff --git a/app/ante/priority.go b/app/ante/priority.go index 22c93fa7d0..5b560a1416 100644 --- a/app/ante/priority.go +++ b/app/ante/priority.go @@ -1,6 +1,8 @@ package ante import ( + "math" + sdk "github.com/cosmos/cosmos-sdk/types" ) @@ -22,6 +24,6 @@ func (vad SystemPriorityDecorator) AnteHandle( simulate bool, next sdk.AnteHandler, ) (sdk.Context, error) { - newCtx := ctx.WithPriority(500000000) // arbirtrary value, to be revisited, maybe relative to current context.Priority (eg. double it) + newCtx := ctx.WithPriority(math.MaxInt64) return next(newCtx, tx, simulate) } diff --git a/app/custom_proposal_handler.go b/app/custom_proposal_handler.go index 6d02fdf816..28ee43cecd 100644 --- a/app/custom_proposal_handler.go +++ b/app/custom_proposal_handler.go @@ -5,12 +5,10 @@ import ( "github.com/cockroachdb/errors" abci "github.com/cometbft/cometbft/abci/types" - evmtypes "github.com/evmos/ethermint/x/evm/types" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/mempool" - authante "github.com/cosmos/cosmos-sdk/x/auth/ante" - "github.com/cosmos/cosmos-sdk/x/auth/signing" + zetamempool "github.com/zeta-chain/zetacore/app/mempool" ) type ( @@ -103,7 +101,7 @@ func (h *CustomProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHand for iterator != nil { memTx := iterator.Tx() - sendersWithNonce, err := GetSendersWithNonce(memTx) + sendersWithNonce, err := zetamempool.GetSendersWithNonce(memTx) if err != nil { panic(fmt.Errorf("failed to get signatures: %w", err)) } @@ -311,57 +309,3 @@ func (ts *defaultTxSelector) SelectTxForProposal(maxTxBytes, maxBlockGas uint64, // check if we've reached capacity; if so, we cannot select any more transactions return ts.totalTxBytes >= maxTxBytes || (maxBlockGas > 0 && (ts.totalTxGas >= maxBlockGas)) } - -// TODO: move to common place and use in mempool too -func GetSendersWithNonce(tx sdk.Tx) ([]SenderWithNonce, error) { - if txWithExtensions, ok := tx.(authante.HasExtensionOptionsTx); ok { - opts := txWithExtensions.GetExtensionOptions() - if len(opts) > 0 && opts[0].GetTypeUrl() == "/ethermint.evm.v1.ExtensionOptionsEthereumTx" { - for _, msg := range tx.GetMsgs() { - if ethMsg, ok := msg.(*evmtypes.MsgEthereumTx); ok { - - return []SenderWithNonce{ - { - Sender: ethMsg.GetFrom().String(), - Nonce: ethMsg.AsTransaction().Nonce(), - }, - }, nil - } - } - } - } - - return getSendersWithNonceDefault(tx) -} - -type SenderWithNonce struct { - Sender string - Nonce uint64 -} - -func getSendersWithNonceDefault(tx sdk.Tx) ([]SenderWithNonce, error) { - sendersWithNonce := []SenderWithNonce{} - - sigTx, ok := tx.(signing.SigVerifiableTx) - if !ok { - return nil, fmt.Errorf("tx of type %T does not implement SigVerifiableTx", tx) - } - - sigs, err := sigTx.GetSignaturesV2() - if err != nil { - return nil, err - } - - if len(sigs) == 0 { - return nil, fmt.Errorf("tx must have at least one signer") - } - - for _, sig := range sigs { - sendersWithNonce = append(sendersWithNonce, SenderWithNonce{ - Sender: sig.PubKey.Address().String(), - Nonce: sig.Sequence, - }) - } - - return sendersWithNonce, nil -} diff --git a/cmd/zetacored/priority_nonce_mempool.go b/app/mempool/priority_nonce_mempool.go similarity index 97% rename from cmd/zetacored/priority_nonce_mempool.go rename to app/mempool/priority_nonce_mempool.go index 8b86026ea9..e3c6d81de2 100644 --- a/cmd/zetacored/priority_nonce_mempool.go +++ b/app/mempool/priority_nonce_mempool.go @@ -1,10 +1,7 @@ -// This is fork of SenderNonceMempool from cosmos sdk 0.47 (check: https://github.com/cosmos/cosmos-sdk/blob/v0.47.10/types/mempool/priority_nonce.go) +// This is fork of PriorityNonceMempool from cosmos sdk 0.47 (check: https://github.com/cosmos/cosmos-sdk/blob/v0.47.10/types/mempool/priority_nonce.go) // only change is part where signatures are checked -// this is just for illustration, if we go with similar approach we would use priority nonce mempool, which has same issue but is more complex -// so testing with this one for now - -package main +package mempool import ( "context" diff --git a/app/mempool/senders_with_nonce.go b/app/mempool/senders_with_nonce.go new file mode 100644 index 0000000000..798bcded74 --- /dev/null +++ b/app/mempool/senders_with_nonce.go @@ -0,0 +1,63 @@ +package mempool + +import ( + "fmt" + + sdk "github.com/cosmos/cosmos-sdk/types" + authante "github.com/cosmos/cosmos-sdk/x/auth/ante" + "github.com/cosmos/cosmos-sdk/x/auth/signing" + evmtypes "github.com/evmos/ethermint/x/evm/types" +) + +func GetSendersWithNonce(tx sdk.Tx) ([]SenderWithNonce, error) { + if txWithExtensions, ok := tx.(authante.HasExtensionOptionsTx); ok { + opts := txWithExtensions.GetExtensionOptions() + if len(opts) > 0 && opts[0].GetTypeUrl() == "/ethermint.evm.v1.ExtensionOptionsEthereumTx" { + for _, msg := range tx.GetMsgs() { + if ethMsg, ok := msg.(*evmtypes.MsgEthereumTx); ok { + + return []SenderWithNonce{ + { + Sender: ethMsg.GetFrom().String(), + Nonce: ethMsg.AsTransaction().Nonce(), + }, + }, nil + } + } + } + } + + return getSendersWithNonceDefault(tx) +} + +type SenderWithNonce struct { + Sender string + Nonce uint64 +} + +func getSendersWithNonceDefault(tx sdk.Tx) ([]SenderWithNonce, error) { + sendersWithNonce := []SenderWithNonce{} + + sigTx, ok := tx.(signing.SigVerifiableTx) + if !ok { + return nil, fmt.Errorf("tx of type %T does not implement SigVerifiableTx", tx) + } + + sigs, err := sigTx.GetSignaturesV2() + if err != nil { + return nil, err + } + + if len(sigs) == 0 { + return nil, fmt.Errorf("tx must have at least one signer") + } + + for _, sig := range sigs { + sendersWithNonce = append(sendersWithNonce, SenderWithNonce{ + Sender: sig.PubKey.Address().String(), + Nonce: sig.Sequence, + }) + } + + return sendersWithNonce, nil +} diff --git a/cmd/zetacored/root.go b/cmd/zetacored/root.go index 233d8b434a..1a674e699f 100644 --- a/cmd/zetacored/root.go +++ b/cmd/zetacored/root.go @@ -39,6 +39,7 @@ import ( ethermintclient "github.com/evmos/ethermint/client" "github.com/spf13/cast" "github.com/spf13/cobra" + zetamempool "github.com/zeta-chain/zetacore/app/mempool" ) const EnvPrefix = "zetacore" @@ -232,11 +233,7 @@ func (ac appCreator) newApp( ) servertypes.Application { baseappOptions := server.DefaultBaseappOptions(appOpts) baseappOptions = append(baseappOptions, func(app *baseapp.BaseApp) { - // app.SetMempool(NewSenderNonceMempool( - // // should be param - // SenderNonceMaxTxOpt(cast.ToInt(1000)), - // )) - app.SetMempool(DefaultPriorityMempool()) + app.SetMempool(zetamempool.DefaultPriorityMempool()) }) skipUpgradeHeights := make(map[int64]bool) for _, h := range cast.ToIntSlice(appOpts.Get(server.FlagUnsafeSkipUpgrades)) { diff --git a/cmd/zetacored/sender_nonce_mempool.go b/cmd/zetacored/sender_nonce_mempool.go deleted file mode 100644 index 91dd32aba5..0000000000 --- a/cmd/zetacored/sender_nonce_mempool.go +++ /dev/null @@ -1,323 +0,0 @@ -// This is fork of SenderNonceMempool from cosmos sdk 0.47 (check: https://github.com/cosmos/cosmos-sdk/blob/v0.47.10/types/mempool/sender_nonce.go) -// only change is part where signatures are checked - -// this is just for illustration, if we go with similar approach we would use priority nonce mempool, which has same issue but is more complex -// so testing with this one for now - -package main - -import ( - "context" - crand "crypto/rand" // #nosec // crypto/rand is used for seed generation - "encoding/binary" - "fmt" - "math/rand" // #nosec // math/rand is used for random selection and seeded from crypto/rand - - "github.com/huandu/skiplist" - - sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/cosmos/cosmos-sdk/types/mempool" - authante "github.com/cosmos/cosmos-sdk/x/auth/ante" - "github.com/cosmos/cosmos-sdk/x/auth/signing" - evmtypes "github.com/evmos/ethermint/x/evm/types" -) - -var ( - _ mempool.Mempool = (*SenderNonceMempool)(nil) - _ mempool.Iterator = (*senderNonceMempoolIterator)(nil) -) - -var DefaultMaxTx = 0 - -// SenderNonceMempool is a mempool that prioritizes transactions within a sender -// by nonce, the lowest first, but selects a random sender on each iteration. -// The mempool is iterated by: -// -// 1) Maintaining a separate list of nonce ordered txs per sender -// 2) For each select iteration, randomly choose a sender and pick the next nonce ordered tx from their list -// 3) Repeat 1,2 until the mempool is exhausted -// -// Note that PrepareProposal could choose to stop iteration before reaching the -// end if maxBytes is reached. -type SenderNonceMempool struct { - senders map[string]*skiplist.SkipList - rnd *rand.Rand - maxTx int - existingTx map[txKey]bool -} - -type SenderNonceOptions func(mp *SenderNonceMempool) - -type txKey struct { - address string - nonce uint64 -} - -// NewSenderNonceMempool creates a new mempool that prioritizes transactions by -// nonce, the lowest first, picking a random sender on each iteration. -func NewSenderNonceMempool(opts ...SenderNonceOptions) *SenderNonceMempool { - senderMap := make(map[string]*skiplist.SkipList) - existingTx := make(map[txKey]bool) - snp := &SenderNonceMempool{ - senders: senderMap, - maxTx: DefaultMaxTx, - existingTx: existingTx, - } - - var seed int64 - err := binary.Read(crand.Reader, binary.BigEndian, &seed) - if err != nil { - panic(err) - } - - snp.setSeed(seed) - - for _, opt := range opts { - opt(snp) - } - - return snp -} - -// SenderNonceSeedOpt Option To add a Seed for random type when calling the -// constructor NewSenderNonceMempool. -// -// Example: -// -// random_seed := int64(1000) -// NewSenderNonceMempool(SenderNonceSeedTxOpt(random_seed)) -func SenderNonceSeedOpt(seed int64) SenderNonceOptions { - return func(snp *SenderNonceMempool) { - snp.setSeed(seed) - } -} - -// SenderNonceMaxTxOpt Option To set limit of max tx when calling the constructor -// NewSenderNonceMempool. -// -// Example: -// -// NewSenderNonceMempool(SenderNonceMaxTxOpt(100)) -func SenderNonceMaxTxOpt(maxTx int) SenderNonceOptions { - return func(snp *SenderNonceMempool) { - snp.maxTx = maxTx - } -} - -func (snm *SenderNonceMempool) setSeed(seed int64) { - s1 := rand.NewSource(seed) - snm.rnd = rand.New(s1) //#nosec // math/rand is seeded from crypto/rand by default -} - -// NextSenderTx returns the next transaction for a given sender by nonce order, -// i.e. the next valid transaction for the sender. If no such transaction exists, -// nil will be returned. -func (mp *SenderNonceMempool) NextSenderTx(sender string) sdk.Tx { - senderIndex, ok := mp.senders[sender] - if !ok { - return nil - } - - cursor := senderIndex.Front() - return cursor.Value.(sdk.Tx) -} - -// Insert adds a tx to the mempool. It returns an error if the tx does not have -// at least one signer. Note, priority is ignored. -func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error { - if snm.maxTx > 0 && snm.CountTx() >= snm.maxTx { - return mempool.ErrMempoolTxMaxCapacity - } - if snm.maxTx < 0 { - return nil - } - - sendersWithNonce, err := GetSendersWithNonce(tx) - if err != nil { - return err - } - - sender := sendersWithNonce[0].Sender - nonce := sendersWithNonce[0].Nonce - - senderTxs, found := snm.senders[sender] - if !found { - senderTxs = skiplist.New(skiplist.Uint64) - snm.senders[sender] = senderTxs - } - - senderTxs.Set(nonce, tx) - - key := txKey{nonce: nonce, address: sender} - snm.existingTx[key] = true - - return nil -} - -// Select returns an iterator ordering transactions the mempool with the lowest -// nonce of a random selected sender first. -// -// NOTE: It is not safe to use this iterator while removing transactions from -// the underlying mempool. -func (snm *SenderNonceMempool) Select(_ context.Context, _ [][]byte) mempool.Iterator { - var senders []string - - senderCursors := make(map[string]*skiplist.Element) - orderedSenders := skiplist.New(skiplist.String) - - // #nosec - for s := range snm.senders { - orderedSenders.Set(s, s) - } - - s := orderedSenders.Front() - for s != nil { - sender := s.Value.(string) - senders = append(senders, sender) - senderCursors[sender] = snm.senders[sender].Front() - s = s.Next() - } - - iter := &senderNonceMempoolIterator{ - senders: senders, - rnd: snm.rnd, - senderCursors: senderCursors, - } - - return iter.Next() -} - -// CountTx returns the total count of txs in the mempool. -func (snm *SenderNonceMempool) CountTx() int { - return len(snm.existingTx) -} - -// Remove removes a tx from the mempool. It returns an error if the tx does not -// have at least one signer or the tx was not found in the pool. -func (snm *SenderNonceMempool) Remove(tx sdk.Tx) error { - sendersWithNonce, err := GetSendersWithNonce(tx) - if err != nil { - return err - } - - sender := sendersWithNonce[0].Sender - nonce := sendersWithNonce[0].Nonce - - senderTxs, found := snm.senders[sender] - - if !found { - return mempool.ErrTxNotFound - } - - res := senderTxs.Remove(nonce) - - if res == nil { - return mempool.ErrTxNotFound - } - - if senderTxs.Len() == 0 { - delete(snm.senders, sender) - } - - key := txKey{nonce: nonce, address: sender} - delete(snm.existingTx, key) - - return nil -} - -type senderNonceMempoolIterator struct { - rnd *rand.Rand - currentTx *skiplist.Element - senders []string - senderCursors map[string]*skiplist.Element -} - -// Next returns the next iterator state which will contain a tx with the next -// smallest nonce of a randomly selected sender. -func (i *senderNonceMempoolIterator) Next() mempool.Iterator { - for len(i.senders) > 0 { - senderIndex := i.rnd.Intn(len(i.senders)) - sender := i.senders[senderIndex] - senderCursor, found := i.senderCursors[sender] - if !found { - i.senders = removeAtIndex(i.senders, senderIndex) - continue - } - - if nextCursor := senderCursor.Next(); nextCursor != nil { - i.senderCursors[sender] = nextCursor - } else { - i.senders = removeAtIndex(i.senders, senderIndex) - } - - return &senderNonceMempoolIterator{ - senders: i.senders, - currentTx: senderCursor, - rnd: i.rnd, - senderCursors: i.senderCursors, - } - } - - return nil -} - -func (i *senderNonceMempoolIterator) Tx() sdk.Tx { - return i.currentTx.Value.(sdk.Tx) -} - -func removeAtIndex[T any](slice []T, index int) []T { - return append(slice[:index], slice[index+1:]...) -} - -func GetSendersWithNonce(tx sdk.Tx) ([]SenderWithNonce, error) { - if txWithExtensions, ok := tx.(authante.HasExtensionOptionsTx); ok { - opts := txWithExtensions.GetExtensionOptions() - if len(opts) > 0 && opts[0].GetTypeUrl() == "/ethermint.evm.v1.ExtensionOptionsEthereumTx" { - for _, msg := range tx.GetMsgs() { - if ethMsg, ok := msg.(*evmtypes.MsgEthereumTx); ok { - - return []SenderWithNonce{ - { - Sender: ethMsg.GetFrom().String(), - Nonce: ethMsg.AsTransaction().Nonce(), - }, - }, nil - } - } - } - } - - return getSendersWithNonceDefault(tx) -} - -type SenderWithNonce struct { - Sender string - Nonce uint64 -} - -func getSendersWithNonceDefault(tx sdk.Tx) ([]SenderWithNonce, error) { - sendersWithNonce := []SenderWithNonce{} - - sigTx, ok := tx.(signing.SigVerifiableTx) - if !ok { - return nil, fmt.Errorf("tx of type %T does not implement SigVerifiableTx", tx) - } - - sigs, err := sigTx.GetSignaturesV2() - if err != nil { - return nil, err - } - - if len(sigs) == 0 { - return nil, fmt.Errorf("tx must have at least one signer") - } - - for _, sig := range sigs { - sendersWithNonce = append(sendersWithNonce, SenderWithNonce{ - Sender: sig.PubKey.Address().String(), - Nonce: sig.Sequence, - }) - } - - return sendersWithNonce, nil -} From a400d1afa322caefd04b234b745af5330e7fc60c Mon Sep 17 00:00:00 2001 From: skosito Date: Fri, 10 May 2024 15:26:51 +0200 Subject: [PATCH 05/11] Add comments and todos --- app/custom_proposal_handler.go | 4 ++++ app/mempool/priority_nonce_mempool.go | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/app/custom_proposal_handler.go b/app/custom_proposal_handler.go index 28ee43cecd..0639e89125 100644 --- a/app/custom_proposal_handler.go +++ b/app/custom_proposal_handler.go @@ -1,5 +1,9 @@ package app +// This proposal handler is taken from https://github.com/cosmos/cosmos-sdk/blob/v0.47.10/baseapp/abci_utils.go +// Only difference is extraction of senders and nonce from tx. In latest version of cosmos, there is a way to provide adapter for this, but in 0.47.10 this is the only way. +// TODO: remove this once we upgrade cosmos + import ( "fmt" diff --git a/app/mempool/priority_nonce_mempool.go b/app/mempool/priority_nonce_mempool.go index e3c6d81de2..5d58c951d2 100644 --- a/app/mempool/priority_nonce_mempool.go +++ b/app/mempool/priority_nonce_mempool.go @@ -1,5 +1,6 @@ -// This is fork of PriorityNonceMempool from cosmos sdk 0.47 (check: https://github.com/cosmos/cosmos-sdk/blob/v0.47.10/types/mempool/priority_nonce.go) -// only change is part where signatures are checked +// This proposal handler is taken from https://github.com/cosmos/cosmos-sdk/blob/v0.47.10/types/mempool/priority_nonce.go +// Only difference is extraction of senders and nonce from tx. In latest version of cosmos, there is a way to provide adapter for this, but in 0.47.10 this is the only way. +// TODO: remove this once we upgrade cosmos package mempool From dd27203626b855cf9cfc089fd6a690c8123718af Mon Sep 17 00:00:00 2001 From: skosito Date: Fri, 10 May 2024 15:35:26 +0200 Subject: [PATCH 06/11] cleanup --- app/app.go | 1 - 1 file changed, 1 deletion(-) diff --git a/app/app.go b/app/app.go index 2f9dd379ca..f817f80783 100644 --- a/app/app.go +++ b/app/app.go @@ -342,7 +342,6 @@ func New( app.ConsensusParamsKeeper = consensusparamkeeper.NewKeeper(appCodec, keys[consensusparamtypes.StoreKey], authAddr) bApp.SetParamStore(&app.ConsensusParamsKeeper) - // DefaultProposalHandler also checks for sigs, probably will need custom customProposalHandler := NewCustomProposalHandler(bApp.Mempool(), bApp) app.SetPrepareProposal(customProposalHandler.PrepareProposalHandler()) From cc66564d5a0ed6bfb143fd3c9b9d249948e7a0dd Mon Sep 17 00:00:00 2001 From: skosito Date: Fri, 10 May 2024 20:14:41 +0200 Subject: [PATCH 07/11] Fix PR comments and build --- ...ity.go => system_tx_priority_decorator.go} | 0 app/ante/system_tx_priority_decorator_test.go | 45 +++++++++++++++++++ app/custom_proposal_handler.go | 6 ++- app/mempool/priority_nonce_mempool.go | 6 +-- app/mempool/senders_with_nonce.go | 10 ++++- changelog.md | 1 + 6 files changed, 62 insertions(+), 6 deletions(-) rename app/ante/{priority.go => system_tx_priority_decorator.go} (100%) create mode 100644 app/ante/system_tx_priority_decorator_test.go diff --git a/app/ante/priority.go b/app/ante/system_tx_priority_decorator.go similarity index 100% rename from app/ante/priority.go rename to app/ante/system_tx_priority_decorator.go diff --git a/app/ante/system_tx_priority_decorator_test.go b/app/ante/system_tx_priority_decorator_test.go new file mode 100644 index 0000000000..f777a98ba1 --- /dev/null +++ b/app/ante/system_tx_priority_decorator_test.go @@ -0,0 +1,45 @@ +package ante_test + +import ( + "math" + "math/rand" + "testing" + "time" + + simtestutil "github.com/cosmos/cosmos-sdk/testutil/sims" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/stretchr/testify/require" + "github.com/zeta-chain/zetacore/app" + "github.com/zeta-chain/zetacore/app/ante" + "github.com/zeta-chain/zetacore/testutil/sample" +) + +func TestSystemTxPriorityDecorator_AnteHandle(t *testing.T) { + txConfig := app.MakeEncodingConfig().TxConfig + + testPrivKey, _ := sample.PrivKeyAddressPair() + + decorator := ante.NewSystemPriorityDecorator() + mmd := MockAnteHandler{} + // set priority to 10 before ante handler + ctx := sdk.Context{}.WithIsCheckTx(true).WithPriority(10) + + tx, err := simtestutil.GenSignedMockTx( + rand.New(rand.NewSource(time.Now().UnixNano())), + txConfig, + []sdk.Msg{}, + sdk.NewCoins(), + simtestutil.DefaultGenTxGas, + "testing-chain-id", + []uint64{0}, + []uint64{0}, + testPrivKey, + ) + require.NoError(t, err) + ctx, err = decorator.AnteHandle(ctx, tx, false, mmd.AnteHandle) + require.NoError(t, err) + + // check that priority is set to max int64 + priorityAfter := ctx.Priority() + require.Equal(t, math.MaxInt64, int(priorityAfter)) +} diff --git a/app/custom_proposal_handler.go b/app/custom_proposal_handler.go index 0639e89125..a35b6d5284 100644 --- a/app/custom_proposal_handler.go +++ b/app/custom_proposal_handler.go @@ -2,7 +2,7 @@ package app // This proposal handler is taken from https://github.com/cosmos/cosmos-sdk/blob/v0.47.10/baseapp/abci_utils.go // Only difference is extraction of senders and nonce from tx. In latest version of cosmos, there is a way to provide adapter for this, but in 0.47.10 this is the only way. -// TODO: remove this once we upgrade cosmos +// TODO: remove this once cosmos is upgraded: https://github.com/zeta-chain/node/issues/2156 import ( "fmt" @@ -75,6 +75,7 @@ func (h *CustomProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHand return func(ctx sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { var maxBlockGas uint64 if b := ctx.ConsensusParams().Block; b != nil { + // #nosec G701 range checked, cosmos-sdk forked code maxBlockGas = uint64(b.MaxGas) } @@ -90,6 +91,7 @@ func (h *CustomProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHand // XXX: We pass nil as the memTx because we have no way of decoding the // txBz. We'd need to break (update) the ProposalTxVerifier interface. // As a result, we CANNOT account for block max gas. + // #nosec G701 range checked, cosmos-sdk forked code stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, nil, txBz) if stop { break @@ -149,6 +151,7 @@ func (h *CustomProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHand panic(err) } } else { + // #nosec G701 range checked, cosmos-sdk forked code stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz) if stop { break @@ -218,6 +221,7 @@ func (h *CustomProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHand totalTxGas += gasTx.GetGas() } + // #nosec G701 range checked, cosmos-sdk forked code if totalTxGas > uint64(maxBlockGas) { return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} } diff --git a/app/mempool/priority_nonce_mempool.go b/app/mempool/priority_nonce_mempool.go index 5d58c951d2..9b1a53091b 100644 --- a/app/mempool/priority_nonce_mempool.go +++ b/app/mempool/priority_nonce_mempool.go @@ -1,6 +1,6 @@ // This proposal handler is taken from https://github.com/cosmos/cosmos-sdk/blob/v0.47.10/types/mempool/priority_nonce.go // Only difference is extraction of senders and nonce from tx. In latest version of cosmos, there is a way to provide adapter for this, but in 0.47.10 this is the only way. -// TODO: remove this once we upgrade cosmos +// TODO: remove this once cosmos is upgraded: https://github.com/zeta-chain/node/issues/2156 package mempool @@ -422,7 +422,7 @@ func IsEmpty(mempool mempool.Mempool) error { return fmt.Errorf("priorityIndex not empty") } - var countKeys []int64 + var countKeys = make([]int64, 0, len(mp.priorityCounts)) for k := range mp.priorityCounts { countKeys = append(countKeys, k) } @@ -433,7 +433,7 @@ func IsEmpty(mempool mempool.Mempool) error { } } - var senderKeys []string + var senderKeys = make([]string, 0, len(mp.senderIndices)) for k := range mp.senderIndices { senderKeys = append(senderKeys, k) } diff --git a/app/mempool/senders_with_nonce.go b/app/mempool/senders_with_nonce.go index 798bcded74..1e3e74e221 100644 --- a/app/mempool/senders_with_nonce.go +++ b/app/mempool/senders_with_nonce.go @@ -1,3 +1,5 @@ +// TODO: use with signer extractor once available https://github.com/zeta-chain/node/issues/2156 + package mempool import ( @@ -9,13 +11,16 @@ import ( evmtypes "github.com/evmos/ethermint/x/evm/types" ) +// GetSendersWithNonce is used to extract sender and nonce information txs +// if tx is ethermint, it is extracted using from and nonce field +// if it's cosmos tx, default cosmos way using signatures is used func GetSendersWithNonce(tx sdk.Tx) ([]SenderWithNonce, error) { + const extensionOptionsEthereumTxTypeUrl = "/ethermint.evm.v1.ExtensionOptionsEthereumTx" if txWithExtensions, ok := tx.(authante.HasExtensionOptionsTx); ok { opts := txWithExtensions.GetExtensionOptions() - if len(opts) > 0 && opts[0].GetTypeUrl() == "/ethermint.evm.v1.ExtensionOptionsEthereumTx" { + if len(opts) > 0 && opts[0].GetTypeUrl() == extensionOptionsEthereumTxTypeUrl { for _, msg := range tx.GetMsgs() { if ethMsg, ok := msg.(*evmtypes.MsgEthereumTx); ok { - return []SenderWithNonce{ { Sender: ethMsg.GetFrom().String(), @@ -35,6 +40,7 @@ type SenderWithNonce struct { Nonce uint64 } +// getSendersWithNonceDefault gets senders and nonces from signatures in cosmos txs func getSendersWithNonceDefault(tx sdk.Tx) ([]SenderWithNonce, error) { sendersWithNonce := []SenderWithNonce{} diff --git a/changelog.md b/changelog.md index 8eedfe2ac3..cc192390b3 100644 --- a/changelog.md +++ b/changelog.md @@ -13,6 +13,7 @@ * [2100](https://github.com/zeta-chain/node/pull/2100) - cosmos v0.47 upgrade * [2145](https://github.com/zeta-chain/node/pull/2145) - add `ibc` and `ibc-transfer` modules * [2135](https://github.com/zeta-chain/node/pull/2135) - add develop build version logic +* [2152](https://github.com/zeta-chain/node/pull/2152) - custom priority nonce mempool ### Refactor From 750b8a4952a52d1fe908888169cc03b74646e4d3 Mon Sep 17 00:00:00 2001 From: skosito Date: Mon, 13 May 2024 12:12:18 +0200 Subject: [PATCH 08/11] Lint fix --- app/mempool/senders_with_nonce.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/mempool/senders_with_nonce.go b/app/mempool/senders_with_nonce.go index 1e3e74e221..360580f2b5 100644 --- a/app/mempool/senders_with_nonce.go +++ b/app/mempool/senders_with_nonce.go @@ -15,10 +15,10 @@ import ( // if tx is ethermint, it is extracted using from and nonce field // if it's cosmos tx, default cosmos way using signatures is used func GetSendersWithNonce(tx sdk.Tx) ([]SenderWithNonce, error) { - const extensionOptionsEthereumTxTypeUrl = "/ethermint.evm.v1.ExtensionOptionsEthereumTx" + const extensionOptionsEthereumTxTypeURL = "/ethermint.evm.v1.ExtensionOptionsEthereumTx" if txWithExtensions, ok := tx.(authante.HasExtensionOptionsTx); ok { opts := txWithExtensions.GetExtensionOptions() - if len(opts) > 0 && opts[0].GetTypeUrl() == extensionOptionsEthereumTxTypeUrl { + if len(opts) > 0 && opts[0].GetTypeUrl() == extensionOptionsEthereumTxTypeURL { for _, msg := range tx.GetMsgs() { if ethMsg, ok := msg.(*evmtypes.MsgEthereumTx); ok { return []SenderWithNonce{ From 300be54a5d0ed91519c8d89f5d645c8207d205f3 Mon Sep 17 00:00:00 2001 From: skosito Date: Tue, 14 May 2024 21:23:39 +0200 Subject: [PATCH 09/11] PR comment --- app/mempool/senders_with_nonce.go | 32 ++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/app/mempool/senders_with_nonce.go b/app/mempool/senders_with_nonce.go index 360580f2b5..9ffc272f8c 100644 --- a/app/mempool/senders_with_nonce.go +++ b/app/mempool/senders_with_nonce.go @@ -19,20 +19,26 @@ func GetSendersWithNonce(tx sdk.Tx) ([]SenderWithNonce, error) { if txWithExtensions, ok := tx.(authante.HasExtensionOptionsTx); ok { opts := txWithExtensions.GetExtensionOptions() if len(opts) > 0 && opts[0].GetTypeUrl() == extensionOptionsEthereumTxTypeURL { - for _, msg := range tx.GetMsgs() { - if ethMsg, ok := msg.(*evmtypes.MsgEthereumTx); ok { - return []SenderWithNonce{ - { - Sender: ethMsg.GetFrom().String(), - Nonce: ethMsg.AsTransaction().Nonce(), - }, - }, nil - } - } + return getSendersWithNonceEthermint(tx) } } - return getSendersWithNonceDefault(tx) + return getSendersWithNonceCosmos(tx) +} + +// getSendersWithNonceEthermint gets senders and nonces from signatures in ethertmint txs +func getSendersWithNonceEthermint(tx sdk.Tx) ([]SenderWithNonce, error) { + for _, msg := range tx.GetMsgs() { + if ethMsg, ok := msg.(*evmtypes.MsgEthereumTx); ok { + return []SenderWithNonce{ + { + Sender: ethMsg.GetFrom().String(), + Nonce: ethMsg.AsTransaction().Nonce(), + }, + }, nil + } + } + return nil, fmt.Errorf("ethermint sender with nonce not found") } type SenderWithNonce struct { @@ -40,8 +46,8 @@ type SenderWithNonce struct { Nonce uint64 } -// getSendersWithNonceDefault gets senders and nonces from signatures in cosmos txs -func getSendersWithNonceDefault(tx sdk.Tx) ([]SenderWithNonce, error) { +// getSendersWithNonceCosmos gets senders and nonces from signatures in cosmos txs +func getSendersWithNonceCosmos(tx sdk.Tx) ([]SenderWithNonce, error) { sendersWithNonce := []SenderWithNonce{} sigTx, ok := tx.(signing.SigVerifiableTx) From f58ac67b1b7d5b26702c9876ac7834a5efa0f728 Mon Sep 17 00:00:00 2001 From: skosito Date: Tue, 14 May 2024 21:27:14 +0200 Subject: [PATCH 10/11] Remove fmt logs --- app/custom_proposal_handler.go | 1 - app/mempool/priority_nonce_mempool.go | 3 --- 2 files changed, 4 deletions(-) diff --git a/app/custom_proposal_handler.go b/app/custom_proposal_handler.go index a35b6d5284..e6f9e4512f 100644 --- a/app/custom_proposal_handler.go +++ b/app/custom_proposal_handler.go @@ -117,7 +117,6 @@ func (h *CustomProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHand shouldAdd := true txSignersSeqs := make(map[string]uint64) for _, sig := range sendersWithNonce { - fmt.Println("prepare proposal ", sig.Sender, sig.Nonce) signer := sig.Sender nonce := sig.Nonce seq, ok := selectedTxsSignersSeqs[signer] diff --git a/app/mempool/priority_nonce_mempool.go b/app/mempool/priority_nonce_mempool.go index 9b1a53091b..cb6ed8748f 100644 --- a/app/mempool/priority_nonce_mempool.go +++ b/app/mempool/priority_nonce_mempool.go @@ -180,11 +180,8 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { sender := sendersWithNonce[0].Sender nonce := sendersWithNonce[0].Nonce - fmt.Println("insert tx ", sender, nonce) - sdkContext := sdk.UnwrapSDKContext(ctx) priority := sdkContext.Priority() - fmt.Println("insert priority ", priority) key := txMeta{nonce: nonce, priority: priority, sender: sender} senderIndex, ok := mp.senderIndices[sender] From 90d9eb43b73b39cc46f37964c3a01b846713311c Mon Sep 17 00:00:00 2001 From: skosito Date: Tue, 14 May 2024 22:03:59 +0200 Subject: [PATCH 11/11] Try to hardcode gosec lint version --- .github/workflows/publish-release.yml | 2 +- .github/workflows/sast-linters.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/publish-release.yml b/.github/workflows/publish-release.yml index 532962d547..d2b75106e4 100644 --- a/.github/workflows/publish-release.yml +++ b/.github/workflows/publish-release.yml @@ -52,7 +52,7 @@ jobs: - name: Run Gosec Security Scanner if: ${{ github.event.inputs.skip_checks != 'true' }} - uses: securego/gosec@master + uses: securego/gosec@v2.19.0 with: args: ./... diff --git a/.github/workflows/sast-linters.yml b/.github/workflows/sast-linters.yml index bd8910893f..b2f4d7f646 100644 --- a/.github/workflows/sast-linters.yml +++ b/.github/workflows/sast-linters.yml @@ -30,7 +30,7 @@ jobs: go-version: '1.20' - name: Run Gosec Security Scanner - uses: securego/gosec@master + uses: securego/gosec@v2.19.0 with: args: ./...