diff --git a/module/x/gravity/keeper/batch.go b/module/x/gravity/keeper/batch.go index b901e7a2c..73be206ac 100644 --- a/module/x/gravity/keeper/batch.go +++ b/module/x/gravity/keeper/batch.go @@ -35,7 +35,7 @@ func (k Keeper) BuildOutgoingTXBatch( if lastBatch != nil { // this traverses the current tx pool for this token type and determines what // fees a hypothetical batch would have if created - currentFees := k.GetBatchFeesByTokenType(ctx, contractAddress, maxElements) + currentFees := k.GetBatchFeeByTokenType(ctx, contractAddress, maxElements) if currentFees == nil { return nil, sdkerrors.Wrap(types.ErrInvalid, "error getting fees from tx pool") } @@ -105,11 +105,6 @@ func (k Keeper) OutgoingTxBatchExecuted(ctx sdk.Context, tokenContract string, n panic(fmt.Sprintf("unknown batch nonce for outgoing tx batch %s %d", tokenContract, nonce)) } - // cleanup outgoing TX pool, while these transactions where hidden from GetPoolTransactions - // they still exist in the pool and need to be cleaned up. - for _, tx := range b.Transactions { - k.removePoolEntry(ctx, tx.Id) - } // Iterate through remaining batches k.IterateOutgoingTXBatches(ctx, func(key []byte, iter_batch *types.OutgoingTxBatch) bool { // If the iterated batches nonce is lower than the one that was just executed, cancel it @@ -163,10 +158,10 @@ func (k Keeper) pickUnbatchedTX( maxElements uint) ([]*types.OutgoingTransferTx, error) { var selectedTx []*types.OutgoingTransferTx var err error - k.IterateOutgoingPoolByFee(ctx, contractAddress, func(txID uint64, tx *types.OutgoingTransferTx) bool { + k.IterateUnbatchedTransactionsByContract(ctx, contractAddress, func(_ []byte, tx *types.OutgoingTransferTx) bool { if tx != nil && tx.Erc20Fee != nil { selectedTx = append(selectedTx, tx) - err = k.removeFromUnbatchedTXIndex(ctx, *tx.Erc20Fee, txID) + err = k.removeUnbatchedTX(ctx, *tx.Erc20Fee, tx.Id) return err != nil || uint(len(selectedTx)) == maxElements } else { panic("tx and fee should never be nil!") @@ -199,8 +194,10 @@ func (k Keeper) CancelOutgoingTXBatch(ctx sdk.Context, tokenContract string, non return types.ErrUnknown } for _, tx := range batch.Transactions { - tx.Erc20Fee.Contract = tokenContract - k.prependToUnbatchedTXIndex(ctx, tokenContract, *tx.Erc20Fee, tx.Id) + err := k.addUnbatchedTX(ctx, tx) + if err != nil { + panic(sdkerrors.Wrapf(err, "unable to add batched transaction back into pool %v", tx)) + } } // Delete batch since it is finished diff --git a/module/x/gravity/keeper/batch_test.go b/module/x/gravity/keeper/batch_test.go index 5d54a6091..8cb18ce69 100644 --- a/module/x/gravity/keeper/batch_test.go +++ b/module/x/gravity/keeper/batch_test.go @@ -1,6 +1,7 @@ package keeper import ( + "fmt" "math/rand" "testing" "time" @@ -41,6 +42,12 @@ func TestBatches(t *testing.T) { fee := types.NewERC20Token(v, myTokenContractAddr).GravityCoin() _, err := input.GravityKeeper.AddToOutgoingPool(ctx, mySender, myReceiver, amount, fee) require.NoError(t, err) + ctx.Logger().Info(fmt.Sprintf("Created transaction %v with amount %v and fee %v", i, amount, fee)) + // Should create: + // 1: tx amount is 100, fee is 2, id is 1 + // 2: tx amount is 101, fee is 3, id is 2 + // 3: tx amount is 102, fee is 2, id is 3 + // 4: tx amount is 103, fee is 1, id is 4 } // when @@ -53,6 +60,8 @@ func TestBatches(t *testing.T) { // then batch is persisted gotFirstBatch := input.GravityKeeper.GetOutgoingTXBatch(ctx, firstBatch.TokenContract, firstBatch.BatchNonce) require.NotNil(t, gotFirstBatch) + // Should have txs 2: and 3: from above, as ties in fees are broken by transaction index + ctx.Logger().Info(fmt.Sprintf("found batch %+v", gotFirstBatch)) expFirstBatch := &types.OutgoingTxBatch{ BatchNonce: 1, @@ -65,11 +74,11 @@ func TestBatches(t *testing.T) { Erc20Token: types.NewERC20Token(101, myTokenContractAddr), }, { - Id: 1, + Id: 3, Erc20Fee: types.NewERC20Token(2, myTokenContractAddr), Sender: mySender.String(), DestAddress: myReceiver, - Erc20Token: types.NewERC20Token(100, myTokenContractAddr), + Erc20Token: types.NewERC20Token(102, myTokenContractAddr), }, }, TokenContract: myTokenContractAddr, @@ -78,18 +87,15 @@ func TestBatches(t *testing.T) { assert.Equal(t, expFirstBatch, gotFirstBatch) // and verify remaining available Tx in the pool - var gotUnbatchedTx []*types.OutgoingTransferTx - input.GravityKeeper.IterateOutgoingPoolByFee(ctx, myTokenContractAddr, func(_ uint64, tx *types.OutgoingTransferTx) bool { - gotUnbatchedTx = append(gotUnbatchedTx, tx) - return false - }) + // Should still have 1: and 4: above + gotUnbatchedTx := input.GravityKeeper.GetUnbatchedTransactionsByContract(ctx, myTokenContractAddr) expUnbatchedTx := []*types.OutgoingTransferTx{ { - Id: 3, + Id: 1, Erc20Fee: types.NewERC20Token(2, myTokenContractAddr), Sender: mySender.String(), DestAddress: myReceiver, - Erc20Token: types.NewERC20Token(102, myTokenContractAddr), + Erc20Token: types.NewERC20Token(100, myTokenContractAddr), }, { Id: 4, @@ -111,6 +117,9 @@ func TestBatches(t *testing.T) { fee := types.NewERC20Token(v, myTokenContractAddr).GravityCoin() _, err = input.GravityKeeper.AddToOutgoingPool(ctx, mySender, myReceiver, amount, fee) require.NoError(t, err) + // Creates the following: + // 5: amount 100, fee 4, id 5 + // 6: amount 101, fee 5, id 6 } // create the more profitable batch @@ -120,6 +129,7 @@ func TestBatches(t *testing.T) { require.NoError(t, err) // check that the more profitable batch has the right txs in it + // Should only have 5: and 6: above expSecondBatch := &types.OutgoingTxBatch{ BatchNonce: 2, Transactions: []*types.OutgoingTransferTx{ @@ -155,11 +165,7 @@ func TestBatches(t *testing.T) { require.Nil(t, gotSecondBatch) // check that txs from first batch have been freed - gotUnbatchedTx = nil - input.GravityKeeper.IterateOutgoingPoolByFee(ctx, myTokenContractAddr, func(_ uint64, tx *types.OutgoingTransferTx) bool { - gotUnbatchedTx = append(gotUnbatchedTx, tx) - return false - }) + gotUnbatchedTx = input.GravityKeeper.GetUnbatchedTransactionsByContract(ctx, myTokenContractAddr) expUnbatchedTx = []*types.OutgoingTransferTx{ { Id: 2, @@ -169,18 +175,18 @@ func TestBatches(t *testing.T) { Erc20Token: types.NewERC20Token(101, myTokenContractAddr), }, { - Id: 1, + Id: 3, Erc20Fee: types.NewERC20Token(2, myTokenContractAddr), Sender: mySender.String(), DestAddress: myReceiver, - Erc20Token: types.NewERC20Token(100, myTokenContractAddr), + Erc20Token: types.NewERC20Token(102, myTokenContractAddr), }, { - Id: 3, + Id: 1, Erc20Fee: types.NewERC20Token(2, myTokenContractAddr), Sender: mySender.String(), DestAddress: myReceiver, - Erc20Token: types.NewERC20Token(102, myTokenContractAddr), + Erc20Token: types.NewERC20Token(100, myTokenContractAddr), }, { Id: 4, @@ -264,11 +270,7 @@ func TestBatchesFullCoins(t *testing.T) { assert.Equal(t, expFirstBatch, gotFirstBatch) // and verify remaining available Tx in the pool - var gotUnbatchedTx []*types.OutgoingTransferTx - input.GravityKeeper.IterateOutgoingPoolByFee(ctx, myTokenContractAddr, func(_ uint64, tx *types.OutgoingTransferTx) bool { - gotUnbatchedTx = append(gotUnbatchedTx, tx) - return false - }) + gotUnbatchedTx := input.GravityKeeper.GetUnbatchedTransactionsByContract(ctx, myTokenContractAddr) expUnbatchedTx := []*types.OutgoingTransferTx{ { Id: 1, @@ -341,11 +343,7 @@ func TestBatchesFullCoins(t *testing.T) { require.Nil(t, gotSecondBatch) // check that txs from first batch have been freed - gotUnbatchedTx = nil - input.GravityKeeper.IterateOutgoingPoolByFee(ctx, myTokenContractAddr, func(_ uint64, tx *types.OutgoingTransferTx) bool { - gotUnbatchedTx = append(gotUnbatchedTx, tx) - return false - }) + gotUnbatchedTx = input.GravityKeeper.GetUnbatchedTransactionsByContract(ctx, myTokenContractAddr) expUnbatchedTx = []*types.OutgoingTransferTx{ { Id: 2, @@ -489,23 +487,34 @@ func TestPoolTxRefund(t *testing.T) { fee := types.NewERC20Token(v, myTokenContractAddr).GravityCoin() _, err := input.GravityKeeper.AddToOutgoingPool(ctx, mySender, myReceiver, amount, fee) require.NoError(t, err) + // Should have created: + // 1: amount 100, fee 2 + // 2: amount 101, fee 3 + // 3: amount 102, fee 2 + // 4: amount 103, fee 1 } // when ctx = ctx.WithBlockTime(now) // tx batch size is 2, so that some of them stay behind - _, err := input.GravityKeeper.BuildOutgoingTXBatch(ctx, myTokenContractAddr, 2) + // Should have 2: and 3: from above + batch, err := input.GravityKeeper.BuildOutgoingTXBatch(ctx, myTokenContractAddr, 2) + batch = batch + unbatched := input.GravityKeeper.GetUnbatchedTransactions(ctx) + unbatched = unbatched require.NoError(t, err) // try to refund a tx that's in a batch - err1 := input.GravityKeeper.RemoveFromOutgoingPoolAndRefund(ctx, 1, mySender) + err1 := input.GravityKeeper.RemoveFromOutgoingPoolAndRefund(ctx, 3, mySender) require.Error(t, err1) // try to refund somebody else's tx err2 := input.GravityKeeper.RemoveFromOutgoingPoolAndRefund(ctx, 4, notMySender) require.Error(t, err2) + prebalances := input.BankKeeper.GetAllBalances(ctx, mySender) + prebalances = prebalances // try to refund a tx that's in the pool err3 := input.GravityKeeper.RemoveFromOutgoingPoolAndRefund(ctx, 4, mySender) require.NoError(t, err3) diff --git a/module/x/gravity/keeper/genesis.go b/module/x/gravity/keeper/genesis.go index 773cf9ac2..07278e228 100644 --- a/module/x/gravity/keeper/genesis.go +++ b/module/x/gravity/keeper/genesis.go @@ -45,7 +45,7 @@ func InitGenesis(ctx sdk.Context, k Keeper, data types.GenesisState) { // reset pool transactions in state for _, tx := range data.UnbatchedTransfers { - if err := k.setPoolEntry(ctx, tx); err != nil { + if err := k.addUnbatchedTX(ctx, tx); err != nil { panic(err) } } @@ -148,7 +148,7 @@ func ExportGenesis(ctx sdk.Context, k Keeper) types.GenesisState { delegates = k.GetDelegateKeys(ctx) lastobserved = k.GetLastObservedEventNonce(ctx) erc20ToDenoms = []*types.ERC20ToDenom{} - unbatchedTransfers = k.GetPoolTransactions(ctx) + unbatchedTransfers = k.GetUnbatchedTransactions(ctx) ) // export valset confirmations from state diff --git a/module/x/gravity/keeper/grpc_query.go b/module/x/gravity/keeper/grpc_query.go index 2d6a1e4ca..b66a6edf1 100644 --- a/module/x/gravity/keeper/grpc_query.go +++ b/module/x/gravity/keeper/grpc_query.go @@ -341,7 +341,7 @@ func (k Keeper) GetPendingSendToEth( req *types.QueryPendingSendToEth) (*types.QueryPendingSendToEthResponse, error) { ctx := sdk.UnwrapSDKContext(c) batches := k.GetOutgoingTxBatches(ctx) - unbatchedTx := k.GetPoolTransactions(ctx) + unbatchedTx := k.GetUnbatchedTransactions(ctx) senderAddress := req.SenderAddress var res *types.QueryPendingSendToEthResponse diff --git a/module/x/gravity/keeper/pool.go b/module/x/gravity/keeper/pool.go index 4acf03530..aeaad8b6a 100644 --- a/module/x/gravity/keeper/pool.go +++ b/module/x/gravity/keeper/pool.go @@ -3,11 +3,9 @@ package keeper import ( "encoding/binary" "fmt" - "math/big" "sort" "strconv" - "github.com/cosmos/cosmos-sdk/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" @@ -66,13 +64,8 @@ func (k Keeper) AddToOutgoingPool(ctx sdk.Context, sender sdk.AccAddress, counte Erc20Fee: erc20Fee, } - // set the outgoing tx in the pool index - if err := k.setPoolEntry(ctx, outgoing); err != nil { - return 0, err - } - // add a second index with the fee - k.appendToUnbatchedTXIndex(ctx, tokenContract, *erc20Fee, nextID) + k.addUnbatchedTX(ctx, outgoing) // todo: add second index for sender so that we can easily query: give pending Tx by sender // todo: what about a second index for receiver? @@ -96,7 +89,7 @@ func (k Keeper) AddToOutgoingPool(ctx sdk.Context, sender sdk.AccAddress, counte // - issues the tokens back to the sender func (k Keeper) RemoveFromOutgoingPoolAndRefund(ctx sdk.Context, txId uint64, sender sdk.AccAddress) error { // check that we actually have a tx with that id and what it's details are - tx, err := k.getPoolEntry(ctx, txId) + tx, err := k.GetUnbatchedTxById(ctx, txId) if err != nil { return err } @@ -117,26 +110,13 @@ func (k Keeper) RemoveFromOutgoingPoolAndRefund(ctx sdk.Context, txId uint64, se return sdkerrors.Wrapf(types.ErrInvalid, "Inconsistent tokens to cancel!: %s %s", tx.Erc20Fee.Contract, tx.Erc20Token.Contract) } - found := false - poolTx := k.GetPoolTransactions(ctx) - for _, pTx := range poolTx { - if pTx.Id == txId { - found = true - } - } - if !found { - return sdkerrors.Wrapf(types.ErrInvalid, "txId %d is not in unbatched pool! Must be in batch!", txId) - } - - // delete this tx from both indexes - err = k.removeFromUnbatchedTXIndex(ctx, *tx.Erc20Fee, txId) + // delete this tx from the pool + err = k.removeUnbatchedTX(ctx, *tx.Erc20Fee, txId) if err != nil { return sdkerrors.Wrapf(types.ErrInvalid, "txId %d not in unbatched index! Must be in a batch!", txId) } - k.removePoolEntry(ctx, txId) // reissue the amount and the fee - totalToRefund := tx.Erc20Token.GravityCoin() totalToRefund.Amount = totalToRefund.Amount.Add(tx.Erc20Fee.Amount) totalToRefundCoins := sdk.NewCoins(totalToRefund) @@ -170,140 +150,124 @@ func (k Keeper) RemoveFromOutgoingPoolAndRefund(ctx sdk.Context, txId uint64, se return nil } -// appendToUnbatchedTXIndex add at the end when tx with same fee exists -func (k Keeper) appendToUnbatchedTXIndex(ctx sdk.Context, tokenContract string, fee types.ERC20Token, txID uint64) { +// addUnbatchedTx creates a new transaction in the pool +func (k Keeper) addUnbatchedTX(ctx sdk.Context, val *types.OutgoingTransferTx) error { store := ctx.KVStore(k.storeKey) - idxKey := types.GetFeeSecondIndexKey(fee) - var idSet types.IDSet + idxKey := types.GetOutgoingTxPoolKey(*val.Erc20Fee, val.Id) if store.Has(idxKey) { - bz := store.Get(idxKey) - k.cdc.MustUnmarshalBinaryBare(bz, &idSet) - } - idSet.Ids = append(idSet.Ids, txID) - store.Set(idxKey, k.cdc.MustMarshalBinaryBare(&idSet)) -} - -// appendToUnbatchedTXIndex add at the top when tx with same fee exists -func (k Keeper) prependToUnbatchedTXIndex(ctx sdk.Context, tokenContract string, fee types.ERC20Token, txID uint64) { - store := ctx.KVStore(k.storeKey) - idxKey := types.GetFeeSecondIndexKey(fee) - var idSet types.IDSet - if store.Has(idxKey) { - bz := store.Get(idxKey) - k.cdc.MustUnmarshalBinaryBare(bz, &idSet) - } - idSet.Ids = append([]uint64{txID}, idSet.Ids...) - store.Set(idxKey, k.cdc.MustMarshalBinaryBare(&idSet)) -} - -// removeFromUnbatchedTXIndex removes the tx from the index and also removes it from the iterator -// GetPoolTransactions, making this tx implicitly invisible without a direct request. We remove a tx -// from the pool for good in OutgoingTxBatchExecuted, but if a batch is canceled or timed out we 'reactivate' -// an entry by adding it back to the second index. -func (k Keeper) removeFromUnbatchedTXIndex(ctx sdk.Context, fee types.ERC20Token, txID uint64) error { - store := ctx.KVStore(k.storeKey) - idxKey := types.GetFeeSecondIndexKey(fee) - var idSet types.IDSet - bz := store.Get(idxKey) - if bz == nil { - return sdkerrors.Wrap(types.ErrUnknown, "fee") + return sdkerrors.Wrap(types.ErrDuplicate, "transaction already in pool") } - k.cdc.MustUnmarshalBinaryBare(bz, &idSet) - for i := range idSet.Ids { - if idSet.Ids[i] == txID { - idSet.Ids = append(idSet.Ids[0:i], idSet.Ids[i+1:]...) - if len(idSet.Ids) != 0 { - store.Set(idxKey, k.cdc.MustMarshalBinaryBare(&idSet)) - } else { - store.Delete(idxKey) - } - return nil - } - } - return sdkerrors.Wrap(types.ErrUnknown, "tx id") -} -func (k Keeper) setPoolEntry(ctx sdk.Context, val *types.OutgoingTransferTx) error { bz, err := k.cdc.MarshalBinaryBare(val) if err != nil { return err } + + store.Set(idxKey, bz) + return err +} + +// removeUnbatchedTXIndex removes the tx from the pool +func (k Keeper) removeUnbatchedTX(ctx sdk.Context, fee types.ERC20Token, txID uint64) error { store := ctx.KVStore(k.storeKey) - store.Set(types.GetOutgoingTxPoolKey(val.Id), bz) + idxKey := types.GetOutgoingTxPoolKey(fee, txID) + if !store.Has(idxKey) { + return sdkerrors.Wrap(types.ErrUnknown, "pool transaction") + } + store.Delete(idxKey) return nil } -// getPoolEntry grabs an entry from the tx pool, this *does* include transactions in batches -// so check the UnbatchedTxIndex or call GetPoolTransactions for that purpose -func (k Keeper) getPoolEntry(ctx sdk.Context, id uint64) (*types.OutgoingTransferTx, error) { +// getUnbatchedTx grabs a tx from the pool given its fee and txID +func (k Keeper) GetUnbatchedTxByFeeAndId(ctx sdk.Context, fee types.ERC20Token, txID uint64) (*types.OutgoingTransferTx, error) { store := ctx.KVStore(k.storeKey) - bz := store.Get(types.GetOutgoingTxPoolKey(id)) + bz := store.Get(types.GetOutgoingTxPoolKey(fee, txID)) if bz == nil { - return nil, types.ErrUnknown + return nil, sdkerrors.Wrap(types.ErrUnknown, "pool transaction") } var r types.OutgoingTransferTx k.cdc.UnmarshalBinaryBare(bz, &r) return &r, nil } -// removePoolEntry removes an entry from the tx pool, this *does* include transactions in batches -// so you will need to run it when cleaning up after a executed batch -func (k Keeper) removePoolEntry(ctx sdk.Context, id uint64) { - store := ctx.KVStore(k.storeKey) - store.Delete(types.GetOutgoingTxPoolKey(id)) +// getUnbatchedTxById grabs a tx from the pool given only the txID +// note that due to the way unbatched txs are indexed, the GetUnbatchedTxByFeeAndId method is much faster +func (k Keeper) GetUnbatchedTxById(ctx sdk.Context, txID uint64) (*types.OutgoingTransferTx, error) { + var r *types.OutgoingTransferTx = nil + k.IterateUnbatchedTransactions(ctx, types.OutgoingTXPoolKey, func(_ []byte, tx *types.OutgoingTransferTx) bool { + if tx.Id == txID { + r = tx + return true + } + return false // iterating DESC, exit early + }) + + if r == nil { + // We have no return tx, it was either batched or never existed + return nil, sdkerrors.Wrap(types.ErrUnknown, "pool transaction") + } + return r, nil +} + +// GetUnbatchedTransactionsByContract, grabs all unbatched transactions from the tx pool for the given contract +// unbatched transactions are sorted by fee amount in DESC order +func (k Keeper) GetUnbatchedTransactionsByContract(ctx sdk.Context, contractAddress string) []*types.OutgoingTransferTx { + return k.collectUnbatchedTransactions(ctx, types.GetOutgoingTxPoolContractPrefix(contractAddress)) } // GetPoolTransactions, grabs all transactions from the tx pool, useful for queries or genesis save/load -// this does not include all transactions in batches, because it iterates using the second index key -func (k Keeper) GetPoolTransactions(ctx sdk.Context) []*types.OutgoingTransferTx { - prefixStore := ctx.KVStore(k.storeKey) - // we must use the second index key here because transactions are left in the store, but removed - // from the tx sorting key, while in batches - iter := prefixStore.ReverseIterator(prefixRange([]byte(types.SecondIndexOutgoingTXFeeKey))) - var ret []*types.OutgoingTransferTx - defer iter.Close() - for ; iter.Valid(); iter.Next() { - var ids types.IDSet - k.cdc.MustUnmarshalBinaryBare(iter.Value(), &ids) - for _, id := range ids.Ids { - tx, err := k.getPoolEntry(ctx, id) - if err != nil { - panic("Invalid id in tx index!") - } - ret = append(ret, tx) - } - } - return ret +func (k Keeper) GetUnbatchedTransactions(ctx sdk.Context) []*types.OutgoingTransferTx { + return k.collectUnbatchedTransactions(ctx, types.OutgoingTXPoolKey) +} + +// Aggregates all unbatched transactions in the store with a given prefix +func (k Keeper) collectUnbatchedTransactions(ctx sdk.Context, prefixKey []byte) (out []*types.OutgoingTransferTx) { + k.IterateUnbatchedTransactions(ctx, prefixKey, func(_ []byte, tx *types.OutgoingTransferTx) bool { + out = append(out, tx) + return false + }) + return +} + +// IterateUnbatchedTransactionsByContract, iterates through unbatched transactions from the tx pool for the given contract +// unbatched transactions are sorted by fee amount in DESC order +func (k Keeper) IterateUnbatchedTransactionsByContract(ctx sdk.Context, contractAddress string, cb func(key []byte, tx *types.OutgoingTransferTx) bool) { + k.IterateUnbatchedTransactions(ctx, types.GetOutgoingTxPoolContractPrefix(contractAddress), cb) } -// IterateOutgoingPoolByFee iterates over the outgoing pool which is sorted by fee -func (k Keeper) IterateOutgoingPoolByFee(ctx sdk.Context, contract string, cb func(uint64, *types.OutgoingTransferTx) bool) { - prefixStore := prefix.NewStore(ctx.KVStore(k.storeKey), types.SecondIndexOutgoingTXFeeKey) - iter := prefixStore.ReverseIterator(prefixRange([]byte(contract))) +// IterateUnbatchedTransactions iterates through all unbatched transactions whose keys begin with prefixKey in DESC order +func (k Keeper) IterateUnbatchedTransactions(ctx sdk.Context, prefixKey []byte, cb func(key []byte, tx *types.OutgoingTransferTx) bool) { + prefixStore := ctx.KVStore(k.storeKey) + iter := prefixStore.ReverseIterator(prefixRange(prefixKey)) defer iter.Close() for ; iter.Valid(); iter.Next() { - var ids types.IDSet - k.cdc.MustUnmarshalBinaryBare(iter.Value(), &ids) + var transact types.OutgoingTransferTx + k.cdc.MustUnmarshalBinaryBare(iter.Value(), &transact) // cb returns true to stop early - for _, id := range ids.Ids { - tx, err := k.getPoolEntry(ctx, id) - if err != nil { - panic("Invalid id in tx index!") - } - if cb(id, tx) { - return - } + if cb(iter.Key(), &transact) { + break } } } -// GetBatchFeesByTokenType gets the fees the next batch of a given token type would -// have if created. This info is both presented to relayers for the purpose of determining +// GetBatchFeeByTokenType gets the fee the next batch of a given token type would +// have if created right now. This info is both presented to relayers for the purpose of determining // when to request batches and also used by the batch creation process to decide not to create -// a new batch -func (k Keeper) GetBatchFeesByTokenType(ctx sdk.Context, tokenContractAddr string, maxElements uint) *types.BatchFees { - batchFeesMap := k.createBatchFees(ctx, maxElements) - return batchFeesMap[tokenContractAddr] +// a new batch (fees must be increasing) +func (k Keeper) GetBatchFeeByTokenType(ctx sdk.Context, tokenContractAddr string, maxElements uint) *types.BatchFees { + batchFee := types.BatchFees{Token: tokenContractAddr, TotalFees: sdk.NewInt(0)} + txCount := 0 + + k.IterateUnbatchedTransactions(ctx, types.GetOutgoingTxPoolContractPrefix(tokenContractAddr), func(_ []byte, tx *types.OutgoingTransferTx) bool { + fee := tx.Erc20Fee + if fee.Contract != tokenContractAddr { + panic(fmt.Errorf("unexpected fee contract %s when getting batch fees for contract %s", fee.Contract, tokenContractAddr)) + } + batchFee.TotalFees = batchFee.TotalFees.Add(fee.Amount) + txCount += 1 + return txCount < int(maxElements) + }) + return &batchFee } // GetAllBatchFees creates a fee entry for every batch type currently in the store @@ -324,51 +288,37 @@ func (k Keeper) GetAllBatchFees(ctx sdk.Context, maxElements uint) (batchFees [] return batchFees } -// CreateBatchFees iterates over the outgoing pool and creates batch token fee map +// CreateBatchFees iterates over the unbatched transaction pool and creates batch token fee map +// Implicitly creates batches with the highest potential fee because the transaction keys enforce an order which goes +// fee contract address -> fee amount -> transaction nonce func (k Keeper) createBatchFees(ctx sdk.Context, maxElements uint) map[string]*types.BatchFees { - prefixStore := prefix.NewStore(ctx.KVStore(k.storeKey), types.SecondIndexOutgoingTXFeeKey) - iter := prefixStore.Iterator(nil, nil) - defer iter.Close() - batchFeesMap := make(map[string]*types.BatchFees) txCountMap := make(map[string]int) - for ; iter.Valid(); iter.Next() { - var ids types.IDSet - k.cdc.MustUnmarshalBinaryBare(iter.Value(), &ids) - - // create a map to store the token contract address and its total fee - // Parse the iterator key to get contract address & fee - // If len(ids.Ids) > 1, multiply fee amount with len(ids.Ids) and add it to total fee amount - - key := iter.Key() - tokenContractBytes := key[:types.ETHContractAddressLen] - tokenContractAddr := string(tokenContractBytes) - - feeAmountBytes := key[len(tokenContractBytes):] - feeAmount := big.NewInt(0).SetBytes(feeAmountBytes) - - for i := 0; i < len(ids.Ids); i++ { - if txCountMap[tokenContractAddr] >= OutgoingTxBatchSize { - break - } else { - // add fee amount - if _, ok := batchFeesMap[tokenContractAddr]; ok { - batchFeesMap[tokenContractAddr].TotalFees = batchFeesMap[tokenContractAddr].TotalFees.Add(sdk.NewIntFromBigInt(feeAmount)) - } else { - batchFeesMap[tokenContractAddr] = &types.BatchFees{ - Token: tokenContractAddr, - TotalFees: sdk.NewIntFromBigInt(feeAmount)} - } - - txCountMap[tokenContractAddr] = txCountMap[tokenContractAddr] + 1 - } + k.IterateUnbatchedTransactions(ctx, types.OutgoingTXPoolKey, func(_ []byte, tx *types.OutgoingTransferTx) bool { + fee := tx.Erc20Fee + if txCountMap[fee.Contract] < int(maxElements) { + addFeeToMap(fee, batchFeesMap, txCountMap) } - } + return false + }) return batchFeesMap } +func addFeeToMap(fee *types.ERC20Token, batchFeesMap map[string]*types.BatchFees, txCountMap map[string]int) { + txCountMap[fee.Contract] = txCountMap[fee.Contract] + 1 + + // add fee amount + if _, ok := batchFeesMap[fee.Contract]; ok { + batchFeesMap[fee.Contract].TotalFees = batchFeesMap[fee.Contract].TotalFees.Add(fee.Amount) + } else { + batchFeesMap[fee.Contract] = &types.BatchFees{ + Token: fee.Contract, + TotalFees: fee.Amount} + } +} + func (k Keeper) autoIncrementID(ctx sdk.Context, idKey []byte) uint64 { store := ctx.KVStore(k.storeKey) bz := store.Get(idKey) diff --git a/module/x/gravity/keeper/pool_test.go b/module/x/gravity/keeper/pool_test.go index 2bbdf4c42..02d5c61b1 100644 --- a/module/x/gravity/keeper/pool_test.go +++ b/module/x/gravity/keeper/pool_test.go @@ -36,13 +36,16 @@ func TestAddToOutgoingPool(t *testing.T) { r, err := input.GravityKeeper.AddToOutgoingPool(ctx, mySender, myReceiver, amount, fee) require.NoError(t, err) t.Logf("___ response: %#v", r) + // Should create: + // 1: amount 100, fee 2 + // 2: amount 101, fee 3 + // 3: amount 102, fee 2 + // 4: amount 103, fee 1 + } // then - var got []*types.OutgoingTransferTx - input.GravityKeeper.IterateOutgoingPoolByFee(ctx, myTokenContractAddr, func(_ uint64, tx *types.OutgoingTransferTx) bool { - got = append(got, tx) - return false - }) + got := input.GravityKeeper.GetUnbatchedTransactionsByContract(ctx, myTokenContractAddr) + exp := []*types.OutgoingTransferTx{ { Id: 2, @@ -52,18 +55,18 @@ func TestAddToOutgoingPool(t *testing.T) { Erc20Token: types.NewERC20Token(101, myTokenContractAddr), }, { - Id: 1, + Id: 3, Erc20Fee: types.NewERC20Token(2, myTokenContractAddr), Sender: mySender.String(), DestAddress: myReceiver, - Erc20Token: types.NewERC20Token(100, myTokenContractAddr), + Erc20Token: types.NewERC20Token(102, myTokenContractAddr), }, { - Id: 3, + Id: 1, Erc20Fee: types.NewERC20Token(2, myTokenContractAddr), Sender: mySender.String(), DestAddress: myReceiver, - Erc20Token: types.NewERC20Token(102, myTokenContractAddr), + Erc20Token: types.NewERC20Token(100, myTokenContractAddr), }, { Id: 4, diff --git a/module/x/gravity/keeper/querier.go b/module/x/gravity/keeper/querier.go index 8cf24615b..9ac347ae5 100644 --- a/module/x/gravity/keeper/querier.go +++ b/module/x/gravity/keeper/querier.go @@ -519,7 +519,7 @@ func queryERC20ToDenom(ctx sdk.Context, ERC20 string, keeper Keeper) ([]byte, er func queryPendingSendToEth(ctx sdk.Context, senderAddr string, k Keeper) ([]byte, error) { batches := k.GetOutgoingTxBatches(ctx) - unbatched_tx := k.GetPoolTransactions(ctx) + unbatched_tx := k.GetUnbatchedTransactions(ctx) sender_address := senderAddr res := types.QueryPendingSendToEthResponse{ TransfersInBatches: []*types.OutgoingTransferTx{}, diff --git a/module/x/gravity/keeper/querier_test.go b/module/x/gravity/keeper/querier_test.go index ee153a068..57cfe2532 100644 --- a/module/x/gravity/keeper/querier_test.go +++ b/module/x/gravity/keeper/querier_test.go @@ -509,11 +509,11 @@ func TestLastPendingBatchRequest(t *testing.T) { } }, { - "id": "1", + "id": "3", "sender": "cosmos1qyqszqgpqyqszqgpqyqszqgpqyqszqgpjnp7du", "dest_address": "0x320915BD0F1bad11cBf06e85D5199DBcAC4E9934", "erc20_token": { - "amount": "100", + "amount": "102", "contract": "0xAb5801a7D398351b8bE11C439e05C5B3259aeC9B" }, "erc20_fee": { @@ -562,6 +562,11 @@ func createTestBatch(t *testing.T, input TestInput) { fee := types.NewERC20Token(v, myTokenContractAddr).GravityCoin() _, err = input.GravityKeeper.AddToOutgoingPool(input.Context, mySender, myReceiver, amount, fee) require.NoError(t, err) + // Should create: + // 1: amount 100, fee 2 + // 2: amount 101, fee 3 + // 3: amount 102, fee 2 + // 4: amount 103, fee 1 } // when input.Context = input.Context.WithBlockTime(now) @@ -569,6 +574,8 @@ func createTestBatch(t *testing.T, input TestInput) { // tx batch size is 2, so that some of them stay behind _, err = input.GravityKeeper.BuildOutgoingTXBatch(input.Context, myTokenContractAddr, 2) require.NoError(t, err) + // Should have 2 and 3 from above + // 1 and 4 should be unbatched } //nolint: exhaustivestruct @@ -753,11 +760,11 @@ func TestQueryBatch(t *testing.T) { }, "dest_address": "0x320915BD0F1bad11cBf06e85D5199DBcAC4E9934", "erc20_token": { - "amount": "100", + "amount": "102", "contract": "0xAb5801a7D398351b8bE11C439e05C5B3259aeC9B" }, "sender": "cosmos1qyqszqgpqyqszqgpqyqszqgpqyqszqgpjnp7du", - "id": "1" + "id": "3" } ], "batch_nonce": "1", @@ -809,7 +816,7 @@ func TestLastBatchesRequest(t *testing.T) { "contract": "0xAb5801a7D398351b8bE11C439e05C5B3259aeC9B" }, "sender": "cosmos1qyqszqgpqyqszqgpqyqszqgpqyqszqgpjnp7du", - "id": "3" + "id": "7" } ], "batch_nonce": "2", @@ -838,11 +845,11 @@ func TestLastBatchesRequest(t *testing.T) { }, "dest_address": "0x320915BD0F1bad11cBf06e85D5199DBcAC4E9934", "erc20_token": { - "amount": "100", + "amount": "102", "contract": "0xAb5801a7D398351b8bE11C439e05C5B3259aeC9B" }, "sender": "cosmos1qyqszqgpqyqszqgpqyqszqgpqyqszqgpjnp7du", - "id": "1" + "id": "3" } ], "batch_nonce": "1", @@ -948,15 +955,22 @@ func TestQueryPendingSendToEth(t *testing.T) { fee := types.NewERC20Token(v, myTokenContractAddr).GravityCoin() _, err := input.GravityKeeper.AddToOutgoingPool(ctx, mySender, myReceiver, amount, fee) require.NoError(t, err) + // Should create: + // 1: amount 100, fee 2 + // 2: amount 101, fee 3 + // 3: amount 102, fee 2 + // 4: amount 104, fee 1 } // when ctx = ctx.WithBlockTime(now) // tx batch size is 2, so that some of them stay behind + // Should contain 2 and 3 from above _, err := input.GravityKeeper.BuildOutgoingTXBatch(ctx, myTokenContractAddr, 2) require.NoError(t, err) + // Should receive 1 and 4 unbatched, 2 and 3 batched in response response, err := queryPendingSendToEth(ctx, mySender.String(), input.GravityKeeper) require.NoError(t, err) expectedJSON := []byte(`{ @@ -975,12 +989,12 @@ func TestQueryPendingSendToEth(t *testing.T) { } }, { - "id": "1", + "id": "3", "sender": "cosmos1ahx7f8wyertuus9r20284ej0asrs085case3kn", "dest_address": "0xd041c41EA1bf0F006ADBb6d2c9ef9D425dE5eaD7", "erc20_token": { "contract": "0x429881672B9AE42b8EbA0E26cD9C73711b891Ca5", - "amount": "100" + "amount": "102" }, "erc20_fee": { "contract": "0x429881672B9AE42b8EbA0E26cD9C73711b891Ca5", @@ -990,12 +1004,12 @@ func TestQueryPendingSendToEth(t *testing.T) { ], "unbatched_transfers": [ { - "id": "3", + "id": "1", "sender": "cosmos1ahx7f8wyertuus9r20284ej0asrs085case3kn", "dest_address": "0xd041c41EA1bf0F006ADBb6d2c9ef9D425dE5eaD7", "erc20_token": { "contract": "0x429881672B9AE42b8EbA0E26cD9C73711b891Ca5", - "amount": "102" + "amount": "100" }, "erc20_fee": { "contract": "0x429881672B9AE42b8EbA0E26cD9C73711b891Ca5", diff --git a/module/x/gravity/types/key.go b/module/x/gravity/types/key.go index 7067cefa1..e3935a496 100644 --- a/module/x/gravity/types/key.go +++ b/module/x/gravity/types/key.go @@ -56,9 +56,6 @@ var ( // DenomiatorPrefix indexes token contract addresses from ETH on gravity DenomiatorPrefix = []byte{0x8} - // SecondIndexOutgoingTXFeeKey indexes fee amounts by token contract address - SecondIndexOutgoingTXFeeKey = []byte{0x9} - // OutgoingTXBatchKey indexes outgoing tx batches under a nonce and token address OutgoingTXBatchKey = []byte{0xa} @@ -204,11 +201,27 @@ func GetAttestationKey(eventNonce uint64, claimHash []byte) []byte { return key } +// GetOutgoingTxPoolContractPrefix returns the following key format +// prefix feeContract +// [0x6][0xc783df8a850f42e7F7e57013759C285caa701eB6] +// This prefix is used for iterating over unbatched transactions for a given contract +func GetOutgoingTxPoolContractPrefix(contractAddress string) []byte { + return append(OutgoingTXPoolKey, []byte(contractAddress)...) +} + // GetOutgoingTxPoolKey returns the following key format -// prefix id -// [0x6][0 0 0 0 0 0 0 1] -func GetOutgoingTxPoolKey(id uint64) []byte { - return append(OutgoingTXPoolKey, sdk.Uint64ToBigEndian(id)...) +// prefix feeContract feeAmount id +// [0x6][0xc783df8a850f42e7F7e57013759C285caa701eB6][1000000000][0 0 0 0 0 0 0 1] +func GetOutgoingTxPoolKey(fee ERC20Token, id uint64) []byte { + // sdkInts have a size limit of 255 bits or 32 bytes + // therefore this will never panic and is always safe + amount := make([]byte, 32) + amount = fee.Amount.BigInt().FillBytes(amount) + + a := append(amount, UInt64Bytes(id)...) + b := append([]byte(fee.Contract), a...) + r := append(OutgoingTXPoolKey, b...) + return r } // GetOutgoingTxBatchKey returns the following key format @@ -236,22 +249,6 @@ func GetBatchConfirmKey(tokenContract string, batchNonce uint64, validator sdk.A return c } -// GetFeeSecondIndexKey returns the following key format -// prefix eth-contract-address fee_amount -// [0x9][0xc783df8a850f42e7F7e57013759C285caa701eB6][1000000000] -func GetFeeSecondIndexKey(fee ERC20Token) []byte { - r := make([]byte, 1+ETHContractAddressLen+32) - // sdkInts have a size limit of 255 bits or 32 bytes - // therefore this will never panic and is always safe - amount := make([]byte, 32) - amount = fee.Amount.BigInt().FillBytes(amount) - // TODO this won't ever work fix it - copy(r[0:], SecondIndexOutgoingTXFeeKey) - copy(r[len(SecondIndexOutgoingTXFeeKey):], []byte(fee.Contract)) - copy(r[len(SecondIndexOutgoingTXFeeKey)+len(fee.Contract):], amount) - return r -} - // GetLastEventNonceByValidatorKey indexes lateset event nonce by validator // GetLastEventNonceByValidatorKey returns the following key format // prefix cosmos-validator