From 323f427831b5d6a508f58f92f562a5e9ff3ca8d8 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Wed, 5 Feb 2020 01:06:05 +0100 Subject: [PATCH] swap, swap/txqueue, contract: add transaction queue --- contracts/swap/factory.go | 7 +- contracts/swap/swap.go | 58 +-- swap/cashout.go | 136 +++--- swap/cashout_test.go | 31 +- swap/common_test.go | 33 +- swap/protocol_test.go | 5 +- swap/simulations_test.go | 20 +- swap/swap.go | 17 +- swap/swap_test.go | 5 +- swap/txqueue/txqueue.go | 790 +++++++++++++++++++++++++++++++++++ swap/txqueue/txqueue_test.go | 305 ++++++++++++++ 11 files changed, 1257 insertions(+), 150 deletions(-) create mode 100644 swap/txqueue/txqueue.go create mode 100644 swap/txqueue/txqueue_test.go diff --git a/contracts/swap/factory.go b/contracts/swap/factory.go index 5c3c7a0be1..d8cb5443b5 100644 --- a/contracts/swap/factory.go +++ b/contracts/swap/factory.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" chequebookFactory "github.com/ethersphere/go-sw3/contracts-v0-2-0/simpleswapfactory" + "github.com/ethersphere/swarm/swap/txqueue" ) var ( @@ -26,7 +27,7 @@ var ( type simpleSwapFactory struct { instance *chequebookFactory.SimpleSwapFactory address common.Address - backend Backend + backend txqueue.Backend } // SimpleSwapFactory interface defines the methods available for a factory contract for SimpleSwap @@ -40,7 +41,7 @@ type SimpleSwapFactory interface { } // FactoryAt creates a SimpleSwapFactory instance for the given address and backend -func FactoryAt(address common.Address, backend Backend) (SimpleSwapFactory, error) { +func FactoryAt(address common.Address, backend txqueue.Backend) (SimpleSwapFactory, error) { simple, err := chequebookFactory.NewSimpleSwapFactory(address, backend) if err != nil { return nil, err @@ -83,7 +84,7 @@ func (sf simpleSwapFactory) DeploySimpleSwap(auth *bind.TransactOpts, issuer com return nil, err } - receipt, err := WaitFunc(auth.Context, sf.backend, tx) + receipt, err := txqueue.WaitFunc(auth.Context, sf.backend, tx.Hash()) if err != nil { return nil, err } diff --git a/contracts/swap/swap.go b/contracts/swap/swap.go index defbf55d91..344bcac36b 100644 --- a/contracts/swap/swap.go +++ b/contracts/swap/swap.go @@ -20,7 +20,6 @@ package swap import ( - "context" "errors" "fmt" "math/big" @@ -29,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" contract "github.com/ethersphere/go-sw3/contracts-v0-2-0/erc20simpleswap" + "github.com/ethersphere/swarm/swap/txqueue" "github.com/ethersphere/swarm/uint256" ) @@ -37,13 +37,6 @@ var ( ErrTransactionReverted = errors.New("Transaction reverted") ) -// Backend wraps all methods required for contract deployment. -type Backend interface { - bind.ContractBackend - TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) - TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) -} - // Contract interface defines the methods exported from the underlying go-bindings for the smart contract type Contract interface { // Withdraw attempts to withdraw ERC20-token from the chequebook @@ -89,13 +82,13 @@ type Params struct { type simpleContract struct { instance *contract.ERC20SimpleSwap address common.Address - backend Backend + backend txqueue.Backend } // InstanceAt creates a new instance of a contract at a specific address. // It assumes that there is an existing contract instance at the given address, or an error is returned // This function is needed to communicate with remote Swap contracts (e.g. sending a cheque) -func InstanceAt(address common.Address, backend Backend) (Contract, error) { +func InstanceAt(address common.Address, backend txqueue.Backend) (Contract, error) { instance, err := contract.NewERC20SimpleSwap(address, backend) if err != nil { return nil, err @@ -110,7 +103,7 @@ func (s simpleContract) Withdraw(auth *bind.TransactOpts, amount *big.Int) (*typ if err != nil { return nil, err } - return WaitFunc(auth.Context, s.backend, tx) + return txqueue.WaitFunc(auth.Context, s.backend, tx.Hash()) } // Deposit sends an amount in ERC20 token to the chequebook and blocks until the transaction is mined @@ -140,7 +133,7 @@ func (s simpleContract) Deposit(auth *bind.TransactOpts, amount *big.Int) (*type if err != nil { return nil, err } - return WaitFunc(auth.Context, s.backend, tx) + return txqueue.WaitFunc(auth.Context, s.backend, tx.Hash()) } // CashChequeBeneficiaryStart sends the transaction to cash a cheque as the beneficiary @@ -224,44 +217,3 @@ func (s simpleContract) Issuer(opts *bind.CallOpts) (common.Address, error) { func (s simpleContract) PaidOut(opts *bind.CallOpts, addr common.Address) (*big.Int, error) { return s.instance.PaidOut(opts, addr) } - -// WaitFunc is the default function to wait for transactions -// We can overwrite this in tests so that we don't need to wait for mining -var WaitFunc = waitForTx - -// waitForTx waits for transaction to be mined and returns the receipt -func waitForTx(ctx context.Context, backend Backend, tx *types.Transaction) (*types.Receipt, error) { - // it blocks here until tx is mined - receipt, err := bind.WaitMined(ctx, backend, tx) - if err != nil { - return nil, err - } - // indicate whether the transaction did not revert - if receipt.Status != types.ReceiptStatusSuccessful { - return nil, ErrTransactionReverted - } - return receipt, nil -} - -// WaitForTransactionByHash waits for a transaction to by mined by hash -func WaitForTransactionByHash(ctx context.Context, backend Backend, txHash common.Hash) (*types.Receipt, error) { - tx, pending, err := backend.TransactionByHash(ctx, txHash) - if err != nil { - return nil, err - } - - var receipt *types.Receipt - if pending { - receipt, err = WaitFunc(ctx, backend, tx) - if err != nil { - return nil, err - } - } else { - receipt, err = backend.TransactionReceipt(ctx, txHash) - if err != nil { - return nil, err - } - } - - return receipt, nil -} diff --git a/swap/cashout.go b/swap/cashout.go index 1b15383e12..c3ef06437b 100644 --- a/swap/cashout.go +++ b/swap/cashout.go @@ -18,11 +18,13 @@ package swap import ( "context" "crypto/ecdsa" + "errors" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/metrics" contract "github.com/ethersphere/swarm/contracts/swap" + "github.com/ethersphere/swarm/swap/txqueue" "github.com/ethersphere/swarm/uint256" ) @@ -31,8 +33,9 @@ const CashChequeBeneficiaryTransactionCost = 50000 // CashoutProcessor holds all relevant fields needed for processing cashouts type CashoutProcessor struct { - backend contract.Backend // ethereum backend to use - privateKey *ecdsa.PrivateKey // private key to use + backend txqueue.Backend // ethereum backend to use + transactionQueue *txqueue.TxQueue // transaction queue to use + chequeCashedChan chan *CashoutRequest } // CashoutRequest represents a request for a cashout operation @@ -41,42 +44,26 @@ type CashoutRequest struct { Destination common.Address // destination for the payout } -// ActiveCashout stores the necessary information for a cashout in progess -type ActiveCashout struct { - Request CashoutRequest // the request that caused this cashout - TransactionHash common.Hash // the hash of the current transaction for this request -} - // newCashoutProcessor creates a new instance of CashoutProcessor -func newCashoutProcessor(backend contract.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor { - return &CashoutProcessor{ - backend: backend, - privateKey: privateKey, +func newCashoutProcessor(transactionQueue *txqueue.TxQueue, backend txqueue.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor { + c := &CashoutProcessor{ + backend: backend, + transactionQueue: transactionQueue, } + + transactionQueue.SetComponent("cashout", c) + return c +} + +func (c *CashoutProcessor) SetChequeCashedChan(chequeCashedChan chan *CashoutRequest) { + c.chequeCashedChan = chequeCashedChan } // cashCheque tries to cash the cheque specified in the request // after the transaction is sent it waits on its success func (c *CashoutProcessor) cashCheque(ctx context.Context, request *CashoutRequest) error { - cheque := request.Cheque - opts := bind.NewKeyedTransactor(c.privateKey) - opts.Context = ctx - - otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend) - if err != nil { - return err - } - - tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature) - if err != nil { - return err - } - - // this blocks until the cashout has been successfully processed - return c.waitForAndProcessActiveCashout(&ActiveCashout{ - Request: *request, - TransactionHash: tx.Hash(), - }) + _, err := c.transactionQueue.QueueRequest("cashout", "CashoutRequest", request) + return err } // estimatePayout estimates the payout for a given cheque as well as the transaction cost @@ -123,30 +110,79 @@ func (c *CashoutProcessor) estimatePayout(ctx context.Context, cheque *Cheque) ( return expectedPayout, transactionCosts, nil } -// waitForAndProcessActiveCashout waits for activeCashout to complete -func (c *CashoutProcessor) waitForAndProcessActiveCashout(activeCashout *ActiveCashout) error { - ctx, cancel := context.WithTimeout(context.Background(), DefaultTransactionTimeout) - defer cancel() - - receipt, err := contract.WaitForTransactionByHash(ctx, c.backend, activeCashout.TransactionHash) - if err != nil { - return err +func (*CashoutProcessor) GetTypeInstance(requestType string) txqueue.TransactionRequest { + switch requestType { + case "CashoutRequest": + return &CashoutRequest{} + default: + return nil } +} - otherSwap, err := contract.InstanceAt(activeCashout.Request.Cheque.Contract, c.backend) - if err != nil { - return err - } +func (*CashoutProcessor) SendTransactionRequest(id uint64, request txqueue.TransactionRequest, backend txqueue.Backend, opts *bind.TransactOpts) (common.Hash, error) { + switch request := request.(type) { + case *CashoutRequest: + cheque := request.Cheque - result := otherSwap.CashChequeBeneficiaryResult(receipt) + otherSwap, err := contract.InstanceAt(cheque.Contract, backend) + if err != nil { + return common.Hash{}, err + } - metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64()) + tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature) + if err != nil { + return common.Hash{}, err + } - if result.Bounced { - metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1) - swapLog.Warn("cheque bounced", "tx", receipt.TxHash) + return tx.Hash(), nil + default: + return common.Hash{}, errors.New("unknown type") } +} - swapLog.Info("cheque cashed", "honey", activeCashout.Request.Cheque.Honey) - return nil +func (c *CashoutProcessor) HandleNotification(id uint64, notification interface{}) error { + switch notification.(type) { + case *txqueue.TransactionReceiptNotification: + notification := notification.(*txqueue.TransactionReceiptNotification) + requestInfo, err := c.transactionQueue.GetRequestInfo(id) + if err != nil { + return err + } + + cashoutRequest, ok := requestInfo.Request.(*CashoutRequest) + if !ok { + return nil + } + otherSwap, err := contract.InstanceAt(cashoutRequest.Cheque.Contract, c.backend) + if err != nil { + return err + } + + receipt := ¬ification.Receipt + + if receipt.Status == 0 { + swapLog.Error("cheque cashing transaction reverted", "tx", receipt.TxHash) + return nil + } + + result := otherSwap.CashChequeBeneficiaryResult(receipt) + + metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64()) + + if result.Bounced { + metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1) + swapLog.Warn("cheque bounced", "tx", receipt.TxHash) + } + + swapLog.Info("cheque cashed", "honey", cashoutRequest.Cheque.Honey) + + select { + case c.chequeCashedChan <- cashoutRequest: + default: + } + + return nil + default: + return nil + } } diff --git a/swap/cashout_test.go b/swap/cashout_test.go index 27bcc46b6b..bb27c10e10 100644 --- a/swap/cashout_test.go +++ b/swap/cashout_test.go @@ -19,10 +19,12 @@ package swap import ( "context" "testing" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/log" - contract "github.com/ethersphere/swarm/contracts/swap" + "github.com/ethersphere/swarm/state" + "github.com/ethersphere/swarm/swap/txqueue" "github.com/ethersphere/swarm/uint256" ) @@ -55,7 +57,7 @@ func TestContractIntegration(t *testing.T) { t.Fatal(err) } - receipt, err := contract.WaitForTransactionByHash(context.Background(), backend, tx.Hash()) + receipt, err := txqueue.WaitFunc(context.Background(), backend, tx.Hash()) if err != nil { t.Fatal(err) } @@ -99,7 +101,7 @@ func TestContractIntegration(t *testing.T) { t.Fatal(err) } - receipt, err = contract.WaitForTransactionByHash(context.Background(), backend, tx.Hash()) + receipt, err = txqueue.WaitFunc(context.Background(), backend, tx.Hash()) if err != nil { t.Fatal(err) } @@ -120,7 +122,12 @@ func TestCashCheque(t *testing.T) { reset := setupContractTest() defer reset() - cashoutProcessor := newCashoutProcessor(backend, ownerKey) + store := state.NewInmemoryStore() + defer store.Close() + transactionQueue := txqueue.NewTxQueue(store, "queue", backend, ownerKey) + transactionQueue.Start() + defer transactionQueue.Stop() + cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey) payout := uint256.FromUint64(42) chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout) @@ -133,6 +140,10 @@ func TestCashCheque(t *testing.T) { t.Fatal(err) } + backend.cashChequeDone = make(chan *CashoutRequest) + defer close(backend.cashChequeDone) + cashoutProcessor.SetChequeCashedChan(backend.cashChequeDone) + err = cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{ Cheque: *testCheque, Destination: ownerAddress, @@ -141,6 +152,11 @@ func TestCashCheque(t *testing.T) { t.Fatal(err) } + select { + case <-backend.cashChequeDone: + case <-time.After(5 * time.Second): + } + paidOut, err := chequebook.PaidOut(nil, ownerAddress) if err != nil { t.Fatal(err) @@ -158,7 +174,12 @@ func TestEstimatePayout(t *testing.T) { reset := setupContractTest() defer reset() - cashoutProcessor := newCashoutProcessor(backend, ownerKey) + store := state.NewInmemoryStore() + defer store.Close() + transactionQueue := txqueue.NewTxQueue(store, "queue", backend, ownerKey) + transactionQueue.Start() + defer transactionQueue.Stop() + cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey) payout := uint256.FromUint64(42) chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout) diff --git a/swap/common_test.go b/swap/common_test.go index 124bbbb7c7..33e045955b 100644 --- a/swap/common_test.go +++ b/swap/common_test.go @@ -24,6 +24,7 @@ import ( "github.com/ethersphere/swarm/network" "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/state" + "github.com/ethersphere/swarm/swap/txqueue" "github.com/ethersphere/swarm/uint256" ) @@ -34,7 +35,7 @@ type swapTestBackend struct { factoryAddress common.Address // address of the SimpleSwapFactory in the simulated network tokenAddress common.Address // address of the token in the simulated network // the async cashing go routine needs synchronization for tests - cashDone chan struct{} + cashChequeDone chan *CashoutRequest } func (b *swapTestBackend) Close() error { @@ -132,6 +133,7 @@ func newTestSwap(t *testing.T, key *ecdsa.PrivateKey, backend *swapTestBackend) usedBackend = newTestBackend(t) } swap, dir := newBaseTestSwap(t, key, usedBackend) + swap.transactionQueue.Start() clean := func() { swap.Close() // only close if created by newTestSwap to avoid double close @@ -212,21 +214,8 @@ func newRandomTestCheque() *Cheque { return cheque } -// During tests, because the cashing in of cheques is async, we should wait for the function to be returned -// Otherwise if we call `handleEmitChequeMsg` manually, it will return before the TX has been committed to the `SimulatedBackend`, -// causing subsequent TX to possibly fail due to nonce mismatch -func testCashCheque(s *Swap, cheque *Cheque) { - cashCheque(s, cheque) - // send to the channel, signals to clients that this function actually finished - if stb, ok := s.backend.(*swapTestBackend); ok { - if stb.cashDone != nil { - stb.cashDone <- struct{}{} - } - } -} - // when testing, we don't need to wait for a transaction to be mined -func testWaitForTx(ctx context.Context, backend cswap.Backend, tx *types.Transaction) (*types.Receipt, error) { +func testWaitForTx(ctx context.Context, backend txqueue.Backend, hash common.Hash) (*types.Receipt, error) { var stb *swapTestBackend var ok bool @@ -235,7 +224,7 @@ func testWaitForTx(ctx context.Context, backend cswap.Backend, tx *types.Transac } stb.Commit() - receipt, err := backend.TransactionReceipt(ctx, tx.Hash()) + receipt, err := backend.TransactionReceipt(ctx, hash) if err != nil { return nil, err } @@ -250,21 +239,17 @@ func testWaitForTx(ctx context.Context, backend cswap.Backend, tx *types.Transac func setupContractTest() func() { // we overwrite the waitForTx function with one which the simulated backend // immediately commits - currentWaitFunc := cswap.WaitFunc - // we also need to store the previous cashCheque function in case this is called multiple times - currentCashCheque := defaultCashCheque - defaultCashCheque = testCashCheque + currentWaitFunc := txqueue.WaitFunc // overwrite only for the duration of the test, so... - cswap.WaitFunc = testWaitForTx + txqueue.WaitFunc = testWaitForTx return func() { // ...we need to set it back to original when done - cswap.WaitFunc = currentWaitFunc - defaultCashCheque = currentCashCheque + txqueue.WaitFunc = currentWaitFunc } } // deploy for testing (needs simulated backend commit) -func testDeployWithPrivateKey(ctx context.Context, backend cswap.Backend, privateKey *ecdsa.PrivateKey, ownerAddress common.Address, depositAmount *uint256.Uint256) (cswap.Contract, error) { +func testDeployWithPrivateKey(ctx context.Context, backend txqueue.Backend, privateKey *ecdsa.PrivateKey, ownerAddress common.Address, depositAmount *uint256.Uint256) (cswap.Contract, error) { opts := bind.NewKeyedTransactor(privateKey) opts.Context = ctx diff --git a/swap/protocol_test.go b/swap/protocol_test.go index 49dfa2c3bc..5e0199b46a 100644 --- a/swap/protocol_test.go +++ b/swap/protocol_test.go @@ -237,7 +237,8 @@ func TestEmitCheque(t *testing.T) { cleanup := setupContractTest() defer cleanup() // now we need to create the channel... - testBackend.cashDone = make(chan struct{}) + testBackend.cashChequeDone = make(chan *CashoutRequest) + creditorSwap.cashoutProcessor.SetChequeCashedChan(testBackend.cashChequeDone) log.Debug("deploy to simulated backend") @@ -319,7 +320,7 @@ func TestEmitCheque(t *testing.T) { // we wait until the cashCheque is actually terminated (ensures proper nounce count) select { - case <-creditorSwap.backend.(*swapTestBackend).cashDone: + case <-testBackend.cashChequeDone: log.Debug("cash transaction completed and committed") case <-time.After(4 * time.Second): t.Fatalf("Timeout waiting for cash transaction to complete") diff --git a/swap/simulations_test.go b/swap/simulations_test.go index 581a2e222e..922e65b10a 100644 --- a/swap/simulations_test.go +++ b/swap/simulations_test.go @@ -61,6 +61,7 @@ For integration tests, run test cluster deployments with all integration moduele (blockchains, oracles, etc.) */ // swapSimulationParams allows to avoid global variables for the test + type swapSimulationParams struct { swaps map[int]*Swap dirs map[int]string @@ -164,9 +165,10 @@ func newSimServiceMap(params *swapSimulationParams) map[string]simulation.Servic if err != nil { return nil, nil, err } + ts.swap.cashoutProcessor.transactionQueue.Start() cleanup = func() { - ts.swap.store.Close() + ts.swap.Close() os.RemoveAll(dir) } @@ -268,8 +270,8 @@ func TestPingPongChequeSimulation(t *testing.T) { cleanup := setupContractTest() defer cleanup() - params.backend.cashDone = make(chan struct{}, 1) - defer close(params.backend.cashDone) + params.backend.cashChequeDone = make(chan *CashoutRequest, 1) + defer close(params.backend.cashChequeDone) // initialize the simulation sim := simulation.NewBzzInProc(newSimServiceMap(params), false) @@ -299,6 +301,9 @@ func TestPingPongChequeSimulation(t *testing.T) { ts1 := sim.Service("swap", p1).(*testService) ts2 := sim.Service("swap", p2).(*testService) + ts1.swap.cashoutProcessor.SetChequeCashedChan(params.backend.cashChequeDone) + ts2.swap.cashoutProcessor.SetChequeCashedChan(params.backend.cashChequeDone) + var ts1Len, ts2Len, ts1sLen, ts2sLen int timeout := time.After(10 * time.Second) @@ -398,8 +403,8 @@ func TestMultiChequeSimulation(t *testing.T) { cleanup := setupContractTest() defer cleanup() - params.backend.cashDone = make(chan struct{}, 1) - defer close(params.backend.cashDone) + params.backend.cashChequeDone = make(chan *CashoutRequest, 1) + defer close(params.backend.cashChequeDone) // initialize the simulation sim := simulation.NewBzzInProc(newSimServiceMap(params), false) defer sim.Close() @@ -431,6 +436,9 @@ func TestMultiChequeSimulation(t *testing.T) { // get the testService for the creditor creditorSvc := sim.Service("swap", creditor).(*testService) + debitorSvc.swap.cashoutProcessor.SetChequeCashedChan(params.backend.cashChequeDone) + creditorSvc.swap.cashoutProcessor.SetChequeCashedChan(params.backend.cashChequeDone) + var debLen, credLen, debSwapLen, credSwapLen int timeout := time.After(10 * time.Second) for { @@ -726,7 +734,7 @@ func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metr lock.Unlock() wg.Done() return - case <-backend.cashDone: + case <-backend.cashChequeDone: wg.Done() return } diff --git a/swap/swap.go b/swap/swap.go index bd2d5c8dd9..702d41806d 100644 --- a/swap/swap.go +++ b/swap/swap.go @@ -39,6 +39,7 @@ import ( "github.com/ethersphere/swarm/network" "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/state" + "github.com/ethersphere/swarm/swap/txqueue" "github.com/ethersphere/swarm/uint256" ) @@ -60,13 +61,14 @@ type Swap struct { peers map[enode.ID]*Peer // map of all swap Peers peersLock sync.RWMutex // lock for peers map owner *Owner // contract access - backend contract.Backend // the backend (blockchain) used + backend txqueue.Backend // the backend (blockchain) used chainID uint64 // id of the chain the backend is connected to params *Params // economic and operational parameters contract contract.Contract // reference to the smart contract chequebookFactory contract.SimpleSwapFactory // the chequebook factory used honeyPriceOracle HoneyOracle // oracle which resolves the price of honey (in Wei) cashoutProcessor *CashoutProcessor // processor for cashing out + transactionQueue *txqueue.TxQueue // transaction queue to use } // Owner encapsulates information related to accessing the contract @@ -135,7 +137,8 @@ func swapRotatingFileHandler(logdir string) (log.Handler, error) { } // newSwapInstance is a swap constructor function without integrity checks -func newSwapInstance(stateStore state.Store, owner *Owner, backend contract.Backend, chainID uint64, params *Params, chequebookFactory contract.SimpleSwapFactory) *Swap { +func newSwapInstance(stateStore state.Store, owner *Owner, backend txqueue.Backend, chainID uint64, params *Params, chequebookFactory contract.SimpleSwapFactory) *Swap { + transactionQueue := txqueue.NewTxQueue(stateStore, "txqueue", backend, owner.privateKey) return &Swap{ store: stateStore, peers: make(map[enode.ID]*Peer), @@ -145,7 +148,8 @@ func newSwapInstance(stateStore state.Store, owner *Owner, backend contract.Back chequebookFactory: chequebookFactory, honeyPriceOracle: NewHoneyPriceOracle(), chainID: chainID, - cashoutProcessor: newCashoutProcessor(backend, owner.privateKey), + cashoutProcessor: newCashoutProcessor(transactionQueue, backend, owner.privateKey), + transactionQueue: transactionQueue, } } @@ -213,6 +217,8 @@ func New(dbPath string, prvkey *ecdsa.PrivateKey, backendURL string, params *Par return nil, err } + swap.transactionQueue.Start() + // deposit money in the chequebook if desired if !skipDepositFlag { // prompt the user for a depositAmount @@ -246,7 +252,7 @@ const ( ) // createFactory determines the factory address and returns and error if no factory address has been specified or is unknown for the network -func createFactory(factoryAddress common.Address, chainID *big.Int, backend contract.Backend) (factory swap.SimpleSwapFactory, err error) { +func createFactory(factoryAddress common.Address, chainID *big.Int, backend txqueue.Backend) (factory swap.SimpleSwapFactory, err error) { if (factoryAddress == common.Address{}) { if factoryAddress, err = contract.FactoryAddressForNetwork(chainID.Uint64()); err != nil { return nil, err @@ -497,7 +503,7 @@ func cashCheque(s *Swap, cheque *Cheque) { if err != nil { metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1) - swapLog.Error("cashing cheque:", err) + swapLog.Error("cashing cheque:", "error", err) } } @@ -603,6 +609,7 @@ func (s *Swap) saveBalance(p enode.ID, balance int64) error { // Close cleans up swap func (s *Swap) Close() error { + s.transactionQueue.Stop() return s.store.Close() } diff --git a/swap/swap_test.go b/swap/swap_test.go index 225bd85038..4336d1dd92 100644 --- a/swap/swap_test.go +++ b/swap/swap_test.go @@ -662,7 +662,8 @@ func TestResetBalance(t *testing.T) { Cheque: cheque, } // now we need to create the channel... - testBackend.cashDone = make(chan struct{}) + testBackend.cashChequeDone = make(chan *CashoutRequest) + creditorSwap.cashoutProcessor.SetChequeCashedChan(testBackend.cashChequeDone) // ...and trigger message handling on the receiver side (creditor) // remember that debitor is the model of the remote node for the creditor... err = creditorSwap.handleEmitChequeMsg(ctx, debitor, msg) @@ -671,7 +672,7 @@ func TestResetBalance(t *testing.T) { } // ...on which we wait until the cashCheque is actually terminated (ensures proper nounce count) select { - case <-testBackend.cashDone: + case <-testBackend.cashChequeDone: log.Debug("cash transaction completed and committed") case <-time.After(4 * time.Second): t.Fatalf("Timeout waiting for cash transactions to complete") diff --git a/swap/txqueue/txqueue.go b/swap/txqueue/txqueue.go new file mode 100644 index 0000000000..868d7d7a0d --- /dev/null +++ b/swap/txqueue/txqueue.go @@ -0,0 +1,790 @@ +package txqueue + +import ( + "context" + "crypto/ecdsa" + "encoding" + "encoding/json" + "errors" + "fmt" + "strconv" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethersphere/swarm/state" +) + +// Backend wraps all methods required for contract deployment. +type Backend interface { + bind.ContractBackend + TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) + TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) +} + +// WaitFunc is the default function used for waiting for a receipt of a transaction +// this is overridden during in tests to automatically create a block on wait +var WaitFunc = func(ctx context.Context, b Backend, hash common.Hash) (*types.Receipt, error) { + queryTicker := time.NewTicker(time.Second) + defer queryTicker.Stop() + + for { + receipt, err := b.TransactionReceipt(ctx, hash) + if receipt != nil { + return receipt, nil + } + if err != nil { + log.Trace("Receipt retrieval failed", "err", err) + } else { + log.Trace("Transaction not yet mined") + } + // Wait for the next round. + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-queryTicker.C: + } + } +} + +// TransactionRequest is the interface for different types of transactions +type TransactionRequest interface { +} + +// TransactionRequestComponent is the interface to be implemented by components using the transaction queue +type TransactionRequestComponent interface { + // GetTypeInstance returns an instance of the requestType or nil if the type is unknown to the component + GetTypeInstance(requestType string) TransactionRequest + // SendTransaction should send the transaction using the backend and opts provided + // opts may be modified, however From, Nonce and Signer must be left untouched + // if the transaction is sent through other means opts.Nonce must be respected (if set to nil, the "pending" nonce must be used) + SendTransactionRequest(id uint64, request TransactionRequest, backend Backend, opts *bind.TransactOpts) (common.Hash, error) + // HandleNotification is called by the transaction queue when a notification for a transaction occurs. + // If the component returns an error the notification will be resent in the future (including across restarts) + HandleNotification(id uint64, notification interface{}) error +} + +// TxQueue represents a central sender for all transactions from a single ethereum account +// its purpose is to ensure there are no nonce issues and that transaction initiators are notified of the result +// notifications are guaranteed to happen even across restarts and disconnects from the ethereum backend +type TxQueue struct { + lock sync.Mutex // lock for the entire queue + ctx context.Context // context used for all network requests to ensure the queue can be stopped at any point + cancel context.CancelFunc // function to cancel the above context + wg sync.WaitGroup // used to ensure that all background go routines have finished before Stop returns + running bool // bool indicating that the queue is running. used to ensure it does not run multiple times simultaneously + + store state.Store // state store to use as the backend + prefix string // all keys in the state store are prefixed with this + requestQueue *Queue // queue for all future requests + components map[string]TransactionRequestComponent // map from component names to their registered value + notificationQueues map[string]*Queue // map from component names to the notification queue of that component + + backend Backend // ethereum backend to use + privateKey *ecdsa.PrivateKey // private key used to sign transactions +} + +// TransactionRequestState is the type used to indicate which state the transaction is in +type TransactionRequestState uint64 + +// SavedTransactionRequest is the metadata the queue saves for every request +type SavedTransactionRequest struct { + Component string // the component which initiated this request + RequestType string // the type of this request + State TransactionRequestState // the state this request is in + Hash common.Hash // the hash of the associated transaction (if already sent) +} + +// NotificationQueueItem is the metadata the queue saves for every pending notification +type NotificationQueueItem struct { + NotificationType string // the type of the notification + RequestID uint64 // the request this notification is for +} + +// TransactionRequestInfo is returned if transaction information is queried through GetRequestInfo +type TransactionRequestInfo struct { + State TransactionRequestState // the state this request is in + Request TransactionRequest // the associated request struct or nil if it could not be decoded + Hash common.Hash // the hash of the associated transaction (if already sent) +} + +// TransactionReceiptNotification is the notification emitted when the receipt is available +type TransactionReceiptNotification struct { + Receipt types.Receipt // the receipt of the included transaction +} + +// TransactionStateChangedNotification is the notification emitted when the state of the request changes +// Note: by the time the component processes the notification, the state might have already changed again +type TransactionStateChangedNotification struct { + OldState TransactionRequestState // the state prior to the change + NewState TransactionRequestState // the state after the change +} + +const ( + // TransactionQueued is the initial state for all requests that enter the queue + TransactionQueued TransactionRequestState = 0 + // TransactionPending means the request is no longer in the queue but not yet confirmed + TransactionPending TransactionRequestState = 1 + // TransactionConfirmed is entered the first time a confirmation is received. This is a terminal state. + TransactionConfirmed TransactionRequestState = 2 + // TransactionStatusUnknown is used for all cases where it is unclear wether the transaction was broadcast or not. This is also used for timed-out transactions. + TransactionStatusUnknown TransactionRequestState = 3 + // TransactionCancelled is used for all cases where it is certain the transaction was and will never be sent + TransactionCancelled TransactionRequestState = 4 + + // retryDelay is the delay between retries in case of io failure + retryDelay = 1 * time.Second +) + +// NewTxQueue creates a new TxQueue +func NewTxQueue(store state.Store, prefix string, backend Backend, privateKey *ecdsa.PrivateKey) *TxQueue { + txq := &TxQueue{ + store: store, + prefix: prefix, + components: make(map[string]TransactionRequestComponent), + notificationQueues: make(map[string]*Queue), + backend: backend, + privateKey: privateKey, + } + txq.requestQueue = txq.newQueue("requestQueue") + txq.ctx, txq.cancel = context.WithCancel(context.Background()) + return txq +} + +// SetComponent registers the component for the given component name +// This starts the delivery of notifications for this component +func (txq *TxQueue) SetComponent(componentName string, component TransactionRequestComponent) { + txq.lock.Lock() + defer txq.lock.Unlock() + if txq.components[componentName] != nil { + log.Error("component %v already set", componentName) + return + } + txq.components[componentName] = component + + // go routine processing the notification queue for this component + txq.wg.Add(1) + go func() { + defer txq.wg.Done() + notifyQ := txq.getNotificationQueue(componentName) + + var item NotificationQueueItem + + for { + // get the next notification item + var key string + done, _ := retry(txq.ctx, func() (err error) { + key, err = notifyQ.next(&item) + return err + }) + if done { + return + } + + // load and decode the notification + var notification interface{} + switch item.NotificationType { + case "TransactionReceiptNotification": + notification = &TransactionReceiptNotification{} + case "TransactionStateChangedNotification": + notification = &TransactionStateChangedNotification{} + } + + done, _ = retry(txq.ctx, func() error { + return txq.store.Get(txq.prefix+"_notification_"+key, notification) + }) + if done { + return + } + + // call the component handler + done, _ = retry(txq.ctx, func() error { + return component.HandleNotification(item.RequestID, notification) + }) + if done { + return + } + + // once the notification was handled delete it from the queue + done, _ = retry(txq.ctx, func() error { + return txq.store.Delete(key) + }) + if done { + return + } + } + }() +} + +// requestKey returns the database key for the SavedTransactionRequest data +func (txq *TxQueue) requestKey(id uint64) string { + return txq.prefix + "_requests_" + strconv.FormatUint(id, 10) +} + +// requestDataKey returns the database key for the custom TransactionRequest +func (txq *TxQueue) requestDataKey(id uint64) string { + return txq.requestKey(id) + "_data" +} + +// activeRequestKey returns the database key used for the currently active request +func (txq *TxQueue) activeRequestKey() string { + return txq.prefix + "_active" +} + +// QueueRequest adds a new request to be processed +// The request is assigned an id which is returned +func (txq *TxQueue) QueueRequest(component string, requestType string, request TransactionRequest) (id uint64, err error) { + txq.lock.Lock() + defer txq.lock.Unlock() + + // get the last id + nonceKey := txq.prefix + "_request_nonce" + err = txq.store.Get(nonceKey, &id) + if err != nil && err != state.ErrNotFound { + return 0, err + } + // ids start at 1 + id++ + + // in a single batch we + // * store the request data + // * store the request metadata + // * add it to the queue + batch := new(state.StoreBatch) + batch.Put(nonceKey, id) + if err := batch.Put(txq.requestDataKey(id), request); err != nil { + return 0, err + } + + err = batch.Put(txq.requestKey(id), &SavedTransactionRequest{ + Component: component, + RequestType: requestType, + State: TransactionQueued, + }) + if err != nil { + return 0, err + } + + _, err, triggerQueue := txq.requestQueue.queue(batch, id) + if err != nil { + return 0, err + } + + // persist to disk + // here we do not need to retry since we can inform the calling component of the error directly + err = txq.store.WriteBatch(batch) + if err != nil { + return 0, err + } + + triggerQueue() + + return id, nil +} + +// getTransactionRequest load the serialized transaction request from disk and tries to decode it +// must be called with the TxQueue lock held +func (txq *TxQueue) getTransactionRequest(id uint64, savedRequest *SavedTransactionRequest) (TransactionRequest, error) { + var request TransactionRequest + compoment, ok := txq.components[savedRequest.Component] + if !ok { + return nil, errors.New("unknown component") + } + request = compoment.GetTypeInstance(savedRequest.RequestType) + if request == nil { + return nil, errors.New("unknown request type") + } + err := txq.store.Get(txq.requestDataKey(id), &request) + if err != nil { + return nil, err + } + return request, nil +} + +// GetRequestInfo can be used externally to query information about the status of a TransactionRequest +func (txq *TxQueue) GetRequestInfo(id uint64) (*TransactionRequestInfo, error) { + txq.lock.Lock() + defer txq.lock.Unlock() + + var savedRequest SavedTransactionRequest + err := txq.store.Get(txq.requestKey(id), &savedRequest) + if err != nil { + return nil, err + } + + request, err := txq.getTransactionRequest(id, &savedRequest) + // if we cannot decode the request we can still return the rest of the info + // this can be useful to query request information even if the corresponding component is not currently registered + if err != nil { + log.Warn("could not decode transaction request", "error", err) + } + + return &TransactionRequestInfo{ + State: savedRequest.State, + Hash: savedRequest.Hash, + Request: request, + }, nil +} + +// Start starts processing transactions if it is not already doing so +func (txq *TxQueue) Start() { + txq.lock.Lock() + defer txq.lock.Unlock() + + if txq.running { + return + } + + txq.running = true + txq.wg.Add(1) + go func() { + err := txq.loop() + if err != nil { + log.Error("transaction queue terminated with an error", "error", err) + } + txq.wg.Done() + }() +} + +// Stop stops processing transactions if it is running +// It will block until processing has terminated +func (txq *TxQueue) Stop() { + txq.lock.Lock() + + if !txq.running { + return + } + + txq.running = false + txq.cancel() + + txq.lock.Unlock() + // wait until all routines have finished + txq.wg.Wait() +} + +// Queue represents a queue stored in the TxQueues state store +type Queue struct { + // queue puts the necessary database operations for the queueing into the supplied batch + // it returns the generated key and a trigger function which must be called if the batch was successfully written + // the lock of the TxQueue should be held when this is called. it must be ensured queue is not called on the same Queue simultaneously + // this only returns an error if the encoding fails which is an unrecoverable error + queue func(*state.StoreBatch, interface{}) (key string, err error, trigger func()) + // peek looks at the next item in the queue + // the error returned is either an decode or an io error + peek func(i interface{}) (key string, exists bool, err error) + // next looks at the next item in the queue and blocks until an item is available if there is none + // the lock of the TxQueue must not be held when this is called + // the error returned is either an decode error, an io error or a cancelled context + next func(i interface{}) (key string, err error) +} + +func (txq *TxQueue) newQueue(prefix string) *Queue { + triggerChan := make(chan struct{}, 1) + var nonce uint64 = 0 + queuePrefix := txq.prefix + "_" + prefix + "_" + + queue := func(b *state.StoreBatch, v interface{}) (key string, err error, trigger func()) { + nonce++ + key = queuePrefix + time.Now().String() + "_" + strconv.FormatUint(nonce, 10) + if err = b.Put(key, v); err != nil { + return "", err, nil + } + + return key, nil, func() { + select { + case triggerChan <- struct{}{}: + default: + } + } + } + + peek := func(i interface{}) (key string, exists bool, err error) { + err = txq.store.Iterate(queuePrefix, func(k, data []byte) (bool, error) { + key = string(k) + unmarshaler, ok := i.(encoding.BinaryUnmarshaler) + if !ok { + return true, json.Unmarshal(data, i) + } + return true, unmarshaler.UnmarshalBinary(data) + }) + if err != nil { + return "", false, err + } + if key == "" { + return "", false, nil + } + return key, true, nil + } + + next := func(i interface{}) (key string, err error) { + key, exists, err := peek(i) + if err != nil { + return "", err + } + if exists { + return key, nil + } + + for { + select { + case <-triggerChan: + key, exists, err = peek(i) + if err != nil { + return "", err + } + if exists { + return key, nil + } + case <-txq.ctx.Done(): + return "", txq.ctx.Err() + } + } + } + + return &Queue{ + queue: queue, + peek: peek, + next: next, + } +} + +// retry tries executing the supplied function until it returns without error or the context is cancelled +// all errors (except for the context cancelled) are logged here +// the purpose of this function is for io errors where access to the database might be restored +// if it returns false the function was successful, otherwise the context was cancelled and the last error (non-ContextCancelled error) is returned +func retry(ctx context.Context, f func() error) (bool, error) { + var lastError error + for { + err := f() + if err != nil { + if !errors.Is(err, context.Canceled) { + log.Error("Error in queue", "error", err.Error()) + lastError = err + } + select { + case <-ctx.Done(): + return true, lastError + case <-time.After(retryDelay): + continue + } + } + return false, nil + } +} + +// getNotificationQueue gets the notification queue for a component +// it initializes the struct if it does not yet exist +// must be called with the TxQueue lock held +func (txq *TxQueue) getNotificationQueue(component string) *Queue { + queue, ok := txq.notificationQueues[component] + if !ok { + queue = txq.newQueue("notify_" + component) + txq.notificationQueues[component] = queue + } + return queue +} + +// updateRequestStatus is a helper function to change the state of a transaction request while also emitting a notification +// in one write batch it +// * adds a TransactionStateChangedNotification notification to the notification queue +// * stores the corresponding notification +// * updates the state of savedRequest and persists it +// it returns the trigger function which must be called once the batch was written +// must be called with the TxQueue lock held +// this only returns an error if the encoding fails which is an unrecoverable error +func (txq *TxQueue) updateRequestStatus(batch *state.StoreBatch, id uint64, savedRequest *SavedTransactionRequest, newState TransactionRequestState) (finish func(), err error) { + nQueue := txq.getNotificationQueue(savedRequest.Component) + key, err, triggerQueue := nQueue.queue(batch, &NotificationQueueItem{ + RequestID: id, + NotificationType: "TransactionStateChangedNotification", + }) + if err != nil { + return nil, fmt.Errorf("could not serialize notification queue item (this is a bug): %v", err) + } + + err = batch.Put(txq.prefix+"_notification_"+key, &TransactionStateChangedNotification{ + OldState: savedRequest.State, + NewState: newState, + }) + if err != nil { + return nil, fmt.Errorf("could not serialize notification (this is a bug): %v", err) + } + savedRequest.State = newState + batch.Put(txq.requestKey(id), savedRequest) + + return triggerQueue, nil +} + +// loop is the main transaction processing function of the TxQueue +// first it checks if there already is an active request. If so it processes this first +// then it will take request from the queue in a loop and execute those +func (txq *TxQueue) loop() error { + // get the stored active request key + // if nothing is stored id will remain 0 (which is invalid as ids start with 1) + var id uint64 + done, err := retry(txq.ctx, func() error { + err := txq.store.Get(txq.activeRequestKey(), &id) + if err != state.ErrNotFound { + return err + } + return nil + }) + if done { + return err + } + + // if there is a non-zero id there is an active request + if id != 0 { + log.Info("Continuing to monitor previous transaction") + // load the request metadata + var savedRequest SavedTransactionRequest + done, err := retry(txq.ctx, func() (err error) { + return txq.store.Get(txq.requestKey(id), &savedRequest) + }) + if done { + return err + } + + switch savedRequest.State { + // if the transaction is still in the Queued state we cannot know for sure where the process terminated + // with a very high likelihood the transaction was not yet sent, but we cannot be sure of that + // the transaction is marked as TransactionStatusUnknown and removed as the active transaction + // in that rare case nonce issue might arise in subsequent requests + case TransactionQueued: + batch := new(state.StoreBatch) + finish, err := txq.updateRequestStatus(batch, id, &savedRequest, TransactionStatusUnknown) + // this only returns an error if the encoding fails which is an unrecoverable error + if err != nil { + return err + } + batch.Delete(txq.activeRequestKey()) + done, err := retry(txq.ctx, func() error { + return txq.store.WriteBatch(batch) + }) + if done { + return err + } + finish() + // if the transaction is in the pending state this means we were previously waiting for the transaction + case TransactionPending: + // this only returns an error if the encoding fails which is an unrecoverable error + if err := txq.waitForActiveTransaction(id, &savedRequest); err != nil { + return err + } + default: + // this indicates a client bug + log.Error("found active transaction in unexpected state") + if err := txq.store.Delete(txq.activeRequestKey()); err != nil { + return err + } + } + } +l: + for txq.running { + // terminate the loop if the context was cancelled + select { + case <-txq.ctx.Done(): + break l + default: + } + + // get the id of the next request in the queue + var id uint64 + var key string + done, err := retry(txq.ctx, func() (err error) { + key, err = txq.requestQueue.next(&id) + return err + }) + if done { + return err + } + + // load the request metadata + var savedRequest SavedTransactionRequest + done, err = retry(txq.ctx, func() (err error) { + return txq.store.Get(txq.requestKey(id), &savedRequest) + }) + if done { + return err + } + + // try to decode the actual request data + request, err := txq.getTransactionRequest(id, &savedRequest) + if err != nil { + // if we get here we couldn't decode the transaction data, most likely because the sending component was not registered + // we mark the request as cancelled and remove it from the queue + batch := new(state.StoreBatch) + finish, err := txq.updateRequestStatus(batch, id, &savedRequest, TransactionCancelled) + // this only returns an error if the encoding fails which is an unrecoverable error + if err != nil { + return err + } + + batch.Delete(key) + + done, err = retry(txq.ctx, func() error { + return txq.store.WriteBatch(batch) + }) + if done { + return err + } + + finish() + continue l + } + + // if the request was successfully decoded it is removed from the queue and set as the active request + batch := new(state.StoreBatch) + err = batch.Put(txq.activeRequestKey(), id) + // this only returns an error if the encoding fails which is an unrecoverable error + if err != nil { + return fmt.Errorf("could not put id write into batch (this is a bug): %v", err) + } + + batch.Delete(key) + + done, err = retry(txq.ctx, func() error { + return txq.store.WriteBatch(batch) + }) + if done { + return err + } + + // finally we call the component to send the actual transaction + component := txq.components[savedRequest.Component] + opts := bind.NewKeyedTransactor(txq.privateKey) + hash, err := component.SendTransactionRequest(id, request, txq.backend, opts) + if err != nil { + // even if SendTransactionRequest returns an error there are still certain rare edge cases where the transaction might still be sent so we mark it as status unknown + batch := new(state.StoreBatch) + finish, err := txq.updateRequestStatus(batch, id, &savedRequest, TransactionStatusUnknown) + if err != nil { + // this only returns an error if the encoding fails which is an unrecoverable error + return fmt.Errorf("failed to write transaction request status to store: %v", err) + } + + done, err = retry(txq.ctx, func() error { + return txq.store.WriteBatch(batch) + }) + if done { + return err + } + finish() + continue l + } + + // if we have a hash we mark the transaction as pending + batch = new(state.StoreBatch) + savedRequest.Hash = hash + finish, err := txq.updateRequestStatus(batch, id, &savedRequest, TransactionPending) + if err != nil { + // this only returns an error if the encoding fails which is an unrecoverable error + return fmt.Errorf("failed to write transaction request status to store: %v", err) + } + + done, err = retry(txq.ctx, func() error { + return txq.store.WriteBatch(batch) + }) + if done { + return err + } + + finish() + + err = txq.waitForActiveTransaction(id, &savedRequest) + if err != nil { + // this only returns an error if the encoding fails which is an unrecoverable error + return fmt.Errorf("error while waiting for transaction: %v", err) + } + } + return nil +} + +// waitForActiveTransaction waits for savedRequest to be mined and resets the active transaction afterwards +// the transaction will also be considered mine once the notification was queued successfully +// this only returns an error if the encoding fails which is an unrecoverable error +func (txq *TxQueue) waitForActiveTransaction(id uint64, savedRequest *SavedTransactionRequest) error { + ctx, cancel := context.WithTimeout(txq.ctx, 20*time.Minute) + defer cancel() + + // an error here means the context was cancelled + receipt, err := WaitFunc(ctx, txq.backend, savedRequest.Hash) + // regardless of the outcome, hold the global TxQueue lock for the rest of this function + txq.lock.Lock() + defer txq.lock.Unlock() + if err != nil { + // if the main context of the TxQueue was cancelled we log and return + if txq.ctx.Err() != nil { + log.Warn("terminating transaction queue while waiting for a transaction") + return nil + } + + // if the timeout context expired we mark the transaction status as unknown + // future versions of the queue (with local nonce-tracking) should keep note of that and reuse the nonce for the next request + log.Warn("transaction timeout reached") + batch := new(state.StoreBatch) + finish, err := txq.updateRequestStatus(batch, id, savedRequest, TransactionStatusUnknown) + if err != nil { + // unrecoverable decode error + return err + } + + done, err := retry(txq.ctx, func() error { + return txq.store.WriteBatch(batch) + }) + if done { + return err + } + + finish() + return nil + } + + // if the transaction is mined we need to + // * update the request state and emit the corresponding notification + // * emit a TransactionReceiptNotification + // * reset the active request + notifyQueue := txq.getNotificationQueue(savedRequest.Component) + batch := new(state.StoreBatch) + triggerRequestQueue, err := txq.updateRequestStatus(batch, id, savedRequest, TransactionConfirmed) + if err != nil { + // unrecoverable decode error + return err + } + + key, err, triggerNotifyQueue := notifyQueue.queue(batch, &NotificationQueueItem{ + RequestID: id, + NotificationType: "TransactionReceiptNotification", + }) + if err != nil { + // unrecoverable decode error + return err + } + + batch.Put(txq.prefix+"_notification_"+key, &TransactionReceiptNotification{ + Receipt: *receipt, + }) + + err = batch.Put(txq.activeRequestKey(), nil) + if err != nil { + // unrecoverable decode error + return err + } + + done, err := retry(txq.ctx, func() error { + return txq.store.WriteBatch(batch) + }) + + if done { + return err + } + + triggerRequestQueue() + triggerNotifyQueue() + return nil +} diff --git a/swap/txqueue/txqueue_test.go b/swap/txqueue/txqueue_test.go new file mode 100644 index 0000000000..b3315b063e --- /dev/null +++ b/swap/txqueue/txqueue_test.go @@ -0,0 +1,305 @@ +package txqueue + +import ( + "context" + "errors" + "fmt" + "math/big" + "reflect" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/accounts/abi/bind/backends" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethersphere/swarm/state" + "github.com/ethersphere/swarm/testutil" +) + +func init() { + testutil.Init() +} + +var ( + senderKey, _ = crypto.HexToECDSA("634fb5a872396d9693e5c9f9d7233cfa93f395c093371017ff44aa9ae6564cdd") + senderAddress = crypto.PubkeyToAddress(senderKey.PublicKey) +) + +var defaultBackend = backends.NewSimulatedBackend(core.GenesisAlloc{ + senderAddress: {Balance: big.NewInt(1000000000000000000)}, +}, 8000000) + +// when testing, we don't need to wait for a transaction to be mined +func testWaitForTx(ctx context.Context, backend Backend, hash common.Hash) (*types.Receipt, error) { + + var stb *backends.SimulatedBackend + var ok bool + if stb, ok = backend.(*backends.SimulatedBackend); !ok { + return nil, errors.New("not the expected test backend") + } + stb.Commit() + + receipt, err := backend.TransactionReceipt(ctx, hash) + if err != nil { + return nil, err + } + if receipt == nil { + return nil, errors.New("receipt was nil") + } + return receipt, nil +} + +// setupContractTest is a helper function for setting up the +// blockchain wait function for testing +func setupContractTest() func() { + // we overwrite the waitForTx function with one which the simulated backend + // immediately commits + currentWaitFunc := WaitFunc + // overwrite only for the duration of the test, so... + WaitFunc = testWaitForTx + return func() { + // ...we need to set it back to original when done + WaitFunc = currentWaitFunc + } +} + +type TestRequest struct { + Value uint64 +} + +type TxQueueTester struct { + lock sync.Mutex + chans map[uint64](chan interface{}) + hashes chan common.Hash + backend Backend +} + +func newTxQueueTester(backend Backend) *TxQueueTester { + return &TxQueueTester{ + backend: backend, + chans: make(map[uint64]chan interface{}), + hashes: make(chan common.Hash, 10), + } +} + +func (tc *TxQueueTester) getChan(id uint64) chan interface{} { + tc.lock.Lock() + defer tc.lock.Unlock() + c, ok := tc.chans[id] + if !ok { + c = make(chan interface{}) + tc.chans[id] = c + } + return c +} + +func (tc *TxQueueTester) HandleNotification(id uint64, notification interface{}) error { + tc.getChan(id) <- notification + return nil +} + +func (*TxQueueTester) GetTypeInstance(requestType string) TransactionRequest { + switch requestType { + case "TestRequest": + return &TestRequest{} + } + return nil +} + +func (tc *TxQueueTester) expectStateChangedNotification(ctx context.Context, id uint64, oldState TransactionRequestState, newState TransactionRequestState) error { + var n interface{} + select { + case n = <-tc.getChan(id): + case <-ctx.Done(): + return ctx.Err() + } + + notification, ok := n.(*TransactionStateChangedNotification) + if !ok { + return fmt.Errorf("wrong type. got %v, expected TransactionStateChangedNotification", reflect.TypeOf(notification)) + } + + if notification.OldState != oldState { + return fmt.Errorf("wrong old state. got %v, expected %v", notification.OldState, oldState) + } + + if notification.NewState != newState { + return fmt.Errorf("wrong new state. got %v, expected %v", notification.NewState, newState) + } + + return nil +} + +func (tc *TxQueueTester) expectReceiptNotification(ctx context.Context, id uint64, hash common.Hash) error { + var n interface{} + select { + case n = <-tc.getChan(id): + case <-ctx.Done(): + return ctx.Err() + } + + receipt, err := tc.backend.TransactionReceipt(context.Background(), hash) + if err != nil { + return err + } + if receipt == nil { + return errors.New("no receipt found for transaction") + } + + notification, ok := n.(*TransactionReceiptNotification) + if !ok { + return fmt.Errorf("wrong type. got %v, expected TransactionReceiptNotification", reflect.TypeOf(notification)) + } + + if notification.Receipt.TxHash != receipt.TxHash { + return fmt.Errorf("wrong old state. got %v, expected %v", notification.Receipt.TxHash, receipt.TxHash) + } + + return nil +} + +func (tc *TxQueueTester) SendTransactionRequest(id uint64, request TransactionRequest, backend Backend, opts *bind.TransactOpts) (hash common.Hash, err error) { + var nonce uint64 + if opts.Nonce == nil { + nonce, err = backend.PendingNonceAt(opts.Context, opts.From) + if err != nil { + return common.Hash{}, err + } + } else { + nonce = opts.Nonce.Uint64() + } + + signed, err := opts.Signer(types.HomesteadSigner{}, opts.From, types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 100000, big.NewInt(int64(10000000)), []byte{})) + if err != nil { + return common.Hash{}, err + } + err = backend.SendTransaction(opts.Context, signed) + if err != nil { + return common.Hash{}, err + } + tc.hashes <- signed.Hash() + return signed.Hash(), nil +} + +func TestTxQueueNewQueue(t *testing.T) { + clean := setupContractTest() + defer clean() + store := state.NewInmemoryStore() + txq := NewTxQueue(store, "test", defaultBackend, senderKey) + + queue := txq.newQueue("testq") + + var wg sync.WaitGroup + wg.Add(2) + + var keys [10]string + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + var value uint64 + key, err := queue.next(&value) + if err != nil { + t.Fatal(err) + } + + if key != keys[i] { + t.Fatalf("keys don't match: got %v, expected %v", key, keys[i]) + } + + if value != uint64(i) { + t.Fatalf("values don't match: got %v, expected %v", value, i) + } + + err = store.Delete(key) + if err != nil { + t.Fatal(err) + } + } + }() + + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + var value = uint64(i) + batch := new(state.StoreBatch) + key, err, trigger := queue.queue(batch, value) + keys[i] = key + if err != nil { + t.Fatal(err) + } + err = store.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + + trigger() + } + }() + + wg.Wait() +} + +func TestTxQueue(t *testing.T) { + clean := setupContractTest() + defer clean() + store := state.NewInmemoryStore() + txq := NewTxQueue(store, "test", defaultBackend, senderKey) + tc := newTxQueueTester(defaultBackend) + txq.SetComponent("test", tc) + + testRequest := &TestRequest{ + Value: 100, + } + + id, err := txq.QueueRequest("test", "TestRequest", testRequest) + if err != nil { + t.Fatal(err) + } + + if id != 1 { + t.Fatal("expected id to be 1") + } + + info, err := txq.GetRequestInfo(id) + if err != nil { + t.Fatal(err) + } + + if info.State != TransactionQueued { + t.Fatal("expected transaction to be queued") + } + + if info.Request.(*TestRequest).Value != 100 { + t.Fatal("expected values to match") + } + + txq.Start() + defer txq.Stop() + defer store.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + if err = tc.expectStateChangedNotification(ctx, id, TransactionQueued, TransactionPending); err != nil { + t.Fatal(err) + } + + if err = tc.expectStateChangedNotification(ctx, id, TransactionPending, TransactionConfirmed); err != nil { + t.Fatal(err) + } + + var hash common.Hash + select { + case hash = <-tc.hashes: + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } + + if err = tc.expectReceiptNotification(ctx, id, hash); err != nil { + t.Fatal(err) + } +}