Skip to content

Commit

Permalink
Switching tx pool index model
Browse files Browse the repository at this point in the history
Gives transactions a single index consisting of [fee contract + fee amount + tx nonce]
Transactions previously had two indices, one for unbatched transactions
and another for batched (but not yet relayed) transactions. Managing
both indices is error prone so we have opted to simplify.
  • Loading branch information
ChristianBorst authored and jkilpatr committed Aug 25, 2021
1 parent d380f0c commit 92d0e12
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 248 deletions.
17 changes: 7 additions & 10 deletions module/x/gravity/keeper/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (k Keeper) BuildOutgoingTXBatch(
if lastBatch != nil {
// this traverses the current tx pool for this token type and determines what
// fees a hypothetical batch would have if created
currentFees := k.GetBatchFeesByTokenType(ctx, contractAddress, maxElements)
currentFees := k.GetBatchFeeByTokenType(ctx, contractAddress, maxElements)
if currentFees == nil {
return nil, sdkerrors.Wrap(types.ErrInvalid, "error getting fees from tx pool")
}
Expand Down Expand Up @@ -105,11 +105,6 @@ func (k Keeper) OutgoingTxBatchExecuted(ctx sdk.Context, tokenContract string, n
panic(fmt.Sprintf("unknown batch nonce for outgoing tx batch %s %d", tokenContract, nonce))
}

// cleanup outgoing TX pool, while these transactions where hidden from GetPoolTransactions
// they still exist in the pool and need to be cleaned up.
for _, tx := range b.Transactions {
k.removePoolEntry(ctx, tx.Id)
}
// Iterate through remaining batches
k.IterateOutgoingTXBatches(ctx, func(key []byte, iter_batch *types.OutgoingTxBatch) bool {
// If the iterated batches nonce is lower than the one that was just executed, cancel it
Expand Down Expand Up @@ -163,10 +158,10 @@ func (k Keeper) pickUnbatchedTX(
maxElements uint) ([]*types.OutgoingTransferTx, error) {
var selectedTx []*types.OutgoingTransferTx
var err error
k.IterateOutgoingPoolByFee(ctx, contractAddress, func(txID uint64, tx *types.OutgoingTransferTx) bool {
k.IterateUnbatchedTransactionsByContract(ctx, contractAddress, func(_ []byte, tx *types.OutgoingTransferTx) bool {
if tx != nil && tx.Erc20Fee != nil {
selectedTx = append(selectedTx, tx)
err = k.removeFromUnbatchedTXIndex(ctx, *tx.Erc20Fee, txID)
err = k.removeUnbatchedTX(ctx, *tx.Erc20Fee, tx.Id)
return err != nil || uint(len(selectedTx)) == maxElements
} else {
panic("tx and fee should never be nil!")
Expand Down Expand Up @@ -199,8 +194,10 @@ func (k Keeper) CancelOutgoingTXBatch(ctx sdk.Context, tokenContract string, non
return types.ErrUnknown
}
for _, tx := range batch.Transactions {
tx.Erc20Fee.Contract = tokenContract
k.prependToUnbatchedTXIndex(ctx, tokenContract, *tx.Erc20Fee, tx.Id)
err := k.addUnbatchedTX(ctx, tx)
if err != nil {
panic(sdkerrors.Wrapf(err, "unable to add batched transaction back into pool %v", tx))
}
}

// Delete batch since it is finished
Expand Down
69 changes: 39 additions & 30 deletions module/x/gravity/keeper/batch_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package keeper

import (
"fmt"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -41,6 +42,12 @@ func TestBatches(t *testing.T) {
fee := types.NewERC20Token(v, myTokenContractAddr).GravityCoin()
_, err := input.GravityKeeper.AddToOutgoingPool(ctx, mySender, myReceiver, amount, fee)
require.NoError(t, err)
ctx.Logger().Info(fmt.Sprintf("Created transaction %v with amount %v and fee %v", i, amount, fee))
// Should create:
// 1: tx amount is 100, fee is 2, id is 1
// 2: tx amount is 101, fee is 3, id is 2
// 3: tx amount is 102, fee is 2, id is 3
// 4: tx amount is 103, fee is 1, id is 4
}

// when
Expand All @@ -53,6 +60,8 @@ func TestBatches(t *testing.T) {
// then batch is persisted
gotFirstBatch := input.GravityKeeper.GetOutgoingTXBatch(ctx, firstBatch.TokenContract, firstBatch.BatchNonce)
require.NotNil(t, gotFirstBatch)
// Should have txs 2: and 3: from above, as ties in fees are broken by transaction index
ctx.Logger().Info(fmt.Sprintf("found batch %+v", gotFirstBatch))

expFirstBatch := &types.OutgoingTxBatch{
BatchNonce: 1,
Expand All @@ -65,11 +74,11 @@ func TestBatches(t *testing.T) {
Erc20Token: types.NewERC20Token(101, myTokenContractAddr),
},
{
Id: 1,
Id: 3,
Erc20Fee: types.NewERC20Token(2, myTokenContractAddr),
Sender: mySender.String(),
DestAddress: myReceiver,
Erc20Token: types.NewERC20Token(100, myTokenContractAddr),
Erc20Token: types.NewERC20Token(102, myTokenContractAddr),
},
},
TokenContract: myTokenContractAddr,
Expand All @@ -78,18 +87,15 @@ func TestBatches(t *testing.T) {
assert.Equal(t, expFirstBatch, gotFirstBatch)

// and verify remaining available Tx in the pool
var gotUnbatchedTx []*types.OutgoingTransferTx
input.GravityKeeper.IterateOutgoingPoolByFee(ctx, myTokenContractAddr, func(_ uint64, tx *types.OutgoingTransferTx) bool {
gotUnbatchedTx = append(gotUnbatchedTx, tx)
return false
})
// Should still have 1: and 4: above
gotUnbatchedTx := input.GravityKeeper.GetUnbatchedTransactionsByContract(ctx, myTokenContractAddr)
expUnbatchedTx := []*types.OutgoingTransferTx{
{
Id: 3,
Id: 1,
Erc20Fee: types.NewERC20Token(2, myTokenContractAddr),
Sender: mySender.String(),
DestAddress: myReceiver,
Erc20Token: types.NewERC20Token(102, myTokenContractAddr),
Erc20Token: types.NewERC20Token(100, myTokenContractAddr),
},
{
Id: 4,
Expand All @@ -111,6 +117,9 @@ func TestBatches(t *testing.T) {
fee := types.NewERC20Token(v, myTokenContractAddr).GravityCoin()
_, err = input.GravityKeeper.AddToOutgoingPool(ctx, mySender, myReceiver, amount, fee)
require.NoError(t, err)
// Creates the following:
// 5: amount 100, fee 4, id 5
// 6: amount 101, fee 5, id 6
}

// create the more profitable batch
Expand All @@ -120,6 +129,7 @@ func TestBatches(t *testing.T) {
require.NoError(t, err)

// check that the more profitable batch has the right txs in it
// Should only have 5: and 6: above
expSecondBatch := &types.OutgoingTxBatch{
BatchNonce: 2,
Transactions: []*types.OutgoingTransferTx{
Expand Down Expand Up @@ -155,11 +165,7 @@ func TestBatches(t *testing.T) {
require.Nil(t, gotSecondBatch)

// check that txs from first batch have been freed
gotUnbatchedTx = nil
input.GravityKeeper.IterateOutgoingPoolByFee(ctx, myTokenContractAddr, func(_ uint64, tx *types.OutgoingTransferTx) bool {
gotUnbatchedTx = append(gotUnbatchedTx, tx)
return false
})
gotUnbatchedTx = input.GravityKeeper.GetUnbatchedTransactionsByContract(ctx, myTokenContractAddr)
expUnbatchedTx = []*types.OutgoingTransferTx{
{
Id: 2,
Expand All @@ -169,18 +175,18 @@ func TestBatches(t *testing.T) {
Erc20Token: types.NewERC20Token(101, myTokenContractAddr),
},
{
Id: 1,
Id: 3,
Erc20Fee: types.NewERC20Token(2, myTokenContractAddr),
Sender: mySender.String(),
DestAddress: myReceiver,
Erc20Token: types.NewERC20Token(100, myTokenContractAddr),
Erc20Token: types.NewERC20Token(102, myTokenContractAddr),
},
{
Id: 3,
Id: 1,
Erc20Fee: types.NewERC20Token(2, myTokenContractAddr),
Sender: mySender.String(),
DestAddress: myReceiver,
Erc20Token: types.NewERC20Token(102, myTokenContractAddr),
Erc20Token: types.NewERC20Token(100, myTokenContractAddr),
},
{
Id: 4,
Expand Down Expand Up @@ -264,11 +270,7 @@ func TestBatchesFullCoins(t *testing.T) {
assert.Equal(t, expFirstBatch, gotFirstBatch)

// and verify remaining available Tx in the pool
var gotUnbatchedTx []*types.OutgoingTransferTx
input.GravityKeeper.IterateOutgoingPoolByFee(ctx, myTokenContractAddr, func(_ uint64, tx *types.OutgoingTransferTx) bool {
gotUnbatchedTx = append(gotUnbatchedTx, tx)
return false
})
gotUnbatchedTx := input.GravityKeeper.GetUnbatchedTransactionsByContract(ctx, myTokenContractAddr)
expUnbatchedTx := []*types.OutgoingTransferTx{
{
Id: 1,
Expand Down Expand Up @@ -341,11 +343,7 @@ func TestBatchesFullCoins(t *testing.T) {
require.Nil(t, gotSecondBatch)

// check that txs from first batch have been freed
gotUnbatchedTx = nil
input.GravityKeeper.IterateOutgoingPoolByFee(ctx, myTokenContractAddr, func(_ uint64, tx *types.OutgoingTransferTx) bool {
gotUnbatchedTx = append(gotUnbatchedTx, tx)
return false
})
gotUnbatchedTx = input.GravityKeeper.GetUnbatchedTransactionsByContract(ctx, myTokenContractAddr)
expUnbatchedTx = []*types.OutgoingTransferTx{
{
Id: 2,
Expand Down Expand Up @@ -489,23 +487,34 @@ func TestPoolTxRefund(t *testing.T) {
fee := types.NewERC20Token(v, myTokenContractAddr).GravityCoin()
_, err := input.GravityKeeper.AddToOutgoingPool(ctx, mySender, myReceiver, amount, fee)
require.NoError(t, err)
// Should have created:
// 1: amount 100, fee 2
// 2: amount 101, fee 3
// 3: amount 102, fee 2
// 4: amount 103, fee 1
}

// when
ctx = ctx.WithBlockTime(now)

// tx batch size is 2, so that some of them stay behind
_, err := input.GravityKeeper.BuildOutgoingTXBatch(ctx, myTokenContractAddr, 2)
// Should have 2: and 3: from above
batch, err := input.GravityKeeper.BuildOutgoingTXBatch(ctx, myTokenContractAddr, 2)
batch = batch
unbatched := input.GravityKeeper.GetUnbatchedTransactions(ctx)
unbatched = unbatched
require.NoError(t, err)

// try to refund a tx that's in a batch
err1 := input.GravityKeeper.RemoveFromOutgoingPoolAndRefund(ctx, 1, mySender)
err1 := input.GravityKeeper.RemoveFromOutgoingPoolAndRefund(ctx, 3, mySender)
require.Error(t, err1)

// try to refund somebody else's tx
err2 := input.GravityKeeper.RemoveFromOutgoingPoolAndRefund(ctx, 4, notMySender)
require.Error(t, err2)

prebalances := input.BankKeeper.GetAllBalances(ctx, mySender)
prebalances = prebalances
// try to refund a tx that's in the pool
err3 := input.GravityKeeper.RemoveFromOutgoingPoolAndRefund(ctx, 4, mySender)
require.NoError(t, err3)
Expand Down
4 changes: 2 additions & 2 deletions module/x/gravity/keeper/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func InitGenesis(ctx sdk.Context, k Keeper, data types.GenesisState) {

// reset pool transactions in state
for _, tx := range data.UnbatchedTransfers {
if err := k.setPoolEntry(ctx, tx); err != nil {
if err := k.addUnbatchedTX(ctx, tx); err != nil {
panic(err)
}
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func ExportGenesis(ctx sdk.Context, k Keeper) types.GenesisState {
delegates = k.GetDelegateKeys(ctx)
lastobserved = k.GetLastObservedEventNonce(ctx)
erc20ToDenoms = []*types.ERC20ToDenom{}
unbatchedTransfers = k.GetPoolTransactions(ctx)
unbatchedTransfers = k.GetUnbatchedTransactions(ctx)
)

// export valset confirmations from state
Expand Down
2 changes: 1 addition & 1 deletion module/x/gravity/keeper/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (k Keeper) GetPendingSendToEth(
req *types.QueryPendingSendToEth) (*types.QueryPendingSendToEthResponse, error) {
ctx := sdk.UnwrapSDKContext(c)
batches := k.GetOutgoingTxBatches(ctx)
unbatchedTx := k.GetPoolTransactions(ctx)
unbatchedTx := k.GetUnbatchedTransactions(ctx)
senderAddress := req.SenderAddress
var res *types.QueryPendingSendToEthResponse

Expand Down
Loading

0 comments on commit 92d0e12

Please sign in to comment.