From eeb298913453eda4bda7cf61173986363f40215d Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Thu, 6 Feb 2020 09:41:29 +0100 Subject: [PATCH] swap, swap/chain: add transaction queue --- swap/cashout.go | 145 ++++++---- swap/cashout_test.go | 41 ++- swap/chain/backend.go | 9 - swap/chain/persistentqueue.go | 105 +++++++ swap/chain/txqueue.go | 515 ++++++++++++++++++++++++++++++++++ swap/chain/txqueue_test.go | 317 +++++++++++++++++++++ swap/chain/txscheduler.go | 76 +++++ swap/common_test.go | 36 +-- swap/protocol_test.go | 13 +- swap/simulations_test.go | 24 +- swap/swap.go | 45 +-- swap/swap_test.go | 19 +- 12 files changed, 1170 insertions(+), 175 deletions(-) create mode 100644 swap/chain/persistentqueue.go create mode 100644 swap/chain/txqueue.go create mode 100644 swap/chain/txqueue_test.go create mode 100644 swap/chain/txscheduler.go diff --git a/swap/cashout.go b/swap/cashout.go index 9d62250c64..64fdb183dd 100644 --- a/swap/cashout.go +++ b/swap/cashout.go @@ -30,10 +30,9 @@ import ( // CashChequeBeneficiaryTransactionCost is the expected gas cost of a CashChequeBeneficiary transaction const CashChequeBeneficiaryTransactionCost = 50000 -// CashoutProcessor holds all relevant fields needed for processing cashouts -type CashoutProcessor struct { - backend chain.Backend // ethereum backend to use - privateKey *ecdsa.PrivateKey // private key to use +var CashoutRequestTypeID = chain.TransactionRequestTypeID{ + Handler: "cashout", + RequestType: "CashoutRequest", } // CashoutRequest represents a request for a cashout operation @@ -42,42 +41,106 @@ type CashoutRequest struct { Destination common.Address // destination for the payout } -// ActiveCashout stores the necessary information for a cashout in progess -type ActiveCashout struct { - Request CashoutRequest // the request that caused this cashout - TransactionHash common.Hash // the hash of the current transaction for this request +// CashoutProcessor holds all relevant fields needed for processing cashouts +type CashoutProcessor struct { + backend chain.Backend // ethereum backend to use + txScheduler chain.TxScheduler // transaction queue to use + cashoutDone chan *CashoutRequest } // newCashoutProcessor creates a new instance of CashoutProcessor -func newCashoutProcessor(backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor { - return &CashoutProcessor{ - backend: backend, - privateKey: privateKey, +func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor { + c := &CashoutProcessor{ + backend: backend, + txScheduler: txScheduler, } + + txScheduler.SetHandlers(CashoutRequestTypeID, &chain.TransactionRequestHandlers{ + Send: func(id uint64, backend chain.Backend, opts *bind.TransactOpts) (common.Hash, error) { + var request CashoutRequest + if err := c.txScheduler.GetRequest(id, &request); err != nil { + return common.Hash{}, err + } + + cheque := request.Cheque + + otherSwap, err := contract.InstanceAt(cheque.Contract, backend) + if err != nil { + return common.Hash{}, err + } + + tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature) + if err != nil { + return common.Hash{}, err + } + return tx.Hash(), nil + }, + NotifyReceipt: func(id uint64, notification *chain.TransactionReceiptNotification) error { + var request *CashoutRequest + err := c.txScheduler.GetRequest(id, &request) + if err != nil { + return err + } + + otherSwap, err := contract.InstanceAt(request.Cheque.Contract, c.backend) + if err != nil { + return err + } + + receipt := ¬ification.Receipt + if receipt.Status == 0 { + swapLog.Error("cheque cashing transaction reverted", "tx", receipt.TxHash) + return nil + } + + result := otherSwap.CashChequeBeneficiaryResult(receipt) + + metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64()) + + if result.Bounced { + metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1) + swapLog.Warn("cheque bounced", "tx", receipt.TxHash) + } + + swapLog.Info("cheque cashed", "honey", request.Cheque.Honey) + + select { + case c.cashoutDone <- request: + default: + } + + return nil + }, + }) + return c } -// cashCheque tries to cash the cheque specified in the request -// after the transaction is sent it waits on its success -func (c *CashoutProcessor) cashCheque(ctx context.Context, request *CashoutRequest) error { - cheque := request.Cheque - opts := bind.NewKeyedTransactor(c.privateKey) - opts.Context = ctx +func (c *CashoutProcessor) setCashoutDoneChan(cashoutDone chan *CashoutRequest) { + c.cashoutDone = cashoutDone +} - otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend) +func (c *CashoutProcessor) submitCheque(request *CashoutRequest) { + expectedPayout, transactionCosts, err := c.estimatePayout(context.TODO(), &request.Cheque) if err != nil { - return err + swapLog.Error("could not estimate payout", "error", err) + return } - tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature) + costsMultiplier := uint256.FromUint64(2) + costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier) if err != nil { - return err + swapLog.Error("overflow in transaction fee", "error", err) + return } - // this blocks until the cashout has been successfully processed - return c.waitForAndProcessActiveCashout(&ActiveCashout{ - Request: *request, - TransactionHash: tx.Hash(), - }) + // do a payout transaction if we get 2 times the gas costs + if expectedPayout.Cmp(costThreshold) == 1 { + _, err := c.txScheduler.ScheduleRequest(CashoutRequestTypeID, request) + if err != nil { + metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1) + swapLog.Error("cashing cheque:", "error", err) + } + } } // estimatePayout estimates the payout for a given cheque as well as the transaction cost @@ -123,31 +186,3 @@ func (c *CashoutProcessor) estimatePayout(ctx context.Context, cheque *Cheque) ( return expectedPayout, transactionCosts, nil } - -// waitForAndProcessActiveCashout waits for activeCashout to complete -func (c *CashoutProcessor) waitForAndProcessActiveCashout(activeCashout *ActiveCashout) error { - ctx, cancel := context.WithTimeout(context.Background(), DefaultTransactionTimeout) - defer cancel() - - receipt, err := chain.WaitMined(ctx, c.backend, activeCashout.TransactionHash) - if err != nil { - return err - } - - otherSwap, err := contract.InstanceAt(activeCashout.Request.Cheque.Contract, c.backend) - if err != nil { - return err - } - - result := otherSwap.CashChequeBeneficiaryResult(receipt) - - metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64()) - - if result.Bounced { - metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1) - swapLog.Warn("cheque bounced", "tx", receipt.TxHash) - } - - swapLog.Info("cheque cashed", "honey", activeCashout.Request.Cheque.Honey) - return nil -} diff --git a/swap/cashout_test.go b/swap/cashout_test.go index 18d56da4ce..1cdf92ed31 100644 --- a/swap/cashout_test.go +++ b/swap/cashout_test.go @@ -19,9 +19,11 @@ package swap import ( "context" "testing" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/log" + "github.com/ethersphere/swarm/state" "github.com/ethersphere/swarm/swap/chain" "github.com/ethersphere/swarm/uint256" ) @@ -33,8 +35,7 @@ import ( // afterwards it attempts to cash-in a bouncing cheque func TestContractIntegration(t *testing.T) { backend := newTestBackend(t) - reset := setupContractTest() - defer reset() + defer backend.Close() payout := uint256.FromUint64(42) chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout) @@ -116,11 +117,15 @@ func TestContractIntegration(t *testing.T) { // TestCashCheque creates a valid cheque and feeds it to cashoutProcessor.cashCheque func TestCashCheque(t *testing.T) { backend := newTestBackend(t) - reset := setupContractTest() - defer reset() + defer backend.Close() - cashoutProcessor := newCashoutProcessor(backend, ownerKey) - payout := uint256.FromUint64(42) + store := state.NewInmemoryStore() + defer store.Close() + transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey) + transactionQueue.Start() + defer transactionQueue.Stop() + cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey) + payout := uint256.FromUint64(CashChequeBeneficiaryTransactionCost*2 + 1) chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout) if err != nil { @@ -132,12 +137,18 @@ func TestCashCheque(t *testing.T) { t.Fatal(err) } - err = cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{ + cashChequeDone := make(chan *CashoutRequest) + defer close(cashChequeDone) + cashoutProcessor.setCashoutDoneChan(cashChequeDone) + + cashoutProcessor.submitCheque(&CashoutRequest{ Cheque: *testCheque, Destination: ownerAddress, }) - if err != nil { - t.Fatal(err) + + select { + case <-cashChequeDone: + case <-time.After(5 * time.Second): } paidOut, err := chequebook.PaidOut(nil, ownerAddress) @@ -154,10 +165,14 @@ func TestCashCheque(t *testing.T) { // TestEstimatePayout creates a valid cheque and feeds it to cashoutProcessor.estimatePayout func TestEstimatePayout(t *testing.T) { backend := newTestBackend(t) - reset := setupContractTest() - defer reset() - - cashoutProcessor := newCashoutProcessor(backend, ownerKey) + defer backend.Close() + + store := state.NewInmemoryStore() + defer store.Close() + transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey) + transactionQueue.Start() + defer transactionQueue.Stop() + cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey) payout := uint256.FromUint64(42) chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout) diff --git a/swap/chain/backend.go b/swap/chain/backend.go index 18837f34bc..c7230dc867 100644 --- a/swap/chain/backend.go +++ b/swap/chain/backend.go @@ -2,7 +2,6 @@ package chain import ( "context" - "errors" "time" "github.com/ethereum/go-ethereum/log" @@ -12,11 +11,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -var ( - // ErrTransactionReverted is given when the transaction that cashes a cheque is reverted - ErrTransactionReverted = errors.New("Transaction reverted") -) - // Backend is the minimum amount of functionality required by the underlying ethereum backend type Backend interface { bind.ContractBackend @@ -36,9 +30,6 @@ func WaitMined(ctx context.Context, b Backend, hash common.Hash) (*types.Receipt log.Trace("Receipt retrieval failed", "err", err) } if receipt != nil { - if receipt.Status != types.ReceiptStatusSuccessful { - return nil, ErrTransactionReverted - } return receipt, nil } diff --git a/swap/chain/persistentqueue.go b/swap/chain/persistentqueue.go new file mode 100644 index 0000000000..0025df813c --- /dev/null +++ b/swap/chain/persistentqueue.go @@ -0,0 +1,105 @@ +package chain + +import ( + "context" + "encoding" + "encoding/json" + "strconv" + "strings" + "sync" + "time" + + "github.com/ethersphere/swarm/state" +) + +// PersistentQueue represents a queue stored in a state store +type PersistentQueue struct { + lock sync.Mutex + store state.Store + prefix string + trigger chan struct{} + nonce uint64 +} + +// NewPersistentQueue creates a structure to interact with a queue with the given prefix +func NewPersistentQueue(store state.Store, prefix string) *PersistentQueue { + return &PersistentQueue{ + store: store, + prefix: prefix, + trigger: make(chan struct{}, 1), + nonce: 0, + } +} + +// Queue puts the necessary database operations for the queueing into the supplied batch +// it returns the generated key and a trigger function which must be called if the batch was successfully written +// this only returns an error if the encoding fails which is an unrecoverable error +func (pq *PersistentQueue) Queue(b *state.StoreBatch, v interface{}) (key string, trigger func(), err error) { + pq.lock.Lock() + defer pq.lock.Unlock() + + pq.nonce++ + key = time.Now().String() + "_" + strconv.FormatUint(pq.nonce, 10) + if err = b.Put(pq.prefix+key, v); err != nil { + return "", nil, err + } + + return key, func() { + select { + case pq.trigger <- struct{}{}: + default: + } + }, nil +} + +// Peek looks at the next item in the queue +// the error returned is either an decode or an io error +func (pq *PersistentQueue) Peek(i interface{}) (key string, exists bool, err error) { + err = pq.store.Iterate(pq.prefix, func(k, data []byte) (bool, error) { + key = string(k) + unmarshaler, ok := i.(encoding.BinaryUnmarshaler) + if !ok { + return true, json.Unmarshal(data, i) + } + return true, unmarshaler.UnmarshalBinary(data) + }) + if err != nil { + return "", false, err + } + if key == "" { + return "", false, nil + } + return strings.TrimPrefix(key, pq.prefix), true, nil +} + +// Next looks at the next item in the queue and blocks until an item is available if there is none +// the error returned is either an decode error, an io error or a cancelled context +func (pq *PersistentQueue) Next(ctx context.Context, i interface{}) (key string, err error) { + key, exists, err := pq.Peek(i) + if err != nil { + return "", err + } + if exists { + return key, nil + } + + for { + select { + case <-pq.trigger: + key, exists, err = pq.Peek(i) + if err != nil { + return "", err + } + if exists { + return key, nil + } + case <-ctx.Done(): + return "", ctx.Err() + } + } +} + +// Delete adds the batch operation to delete the queue element with the given key +func (pq *PersistentQueue) Delete(b *state.StoreBatch, key string) { + b.Delete(pq.prefix + key) +} diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go new file mode 100644 index 0000000000..7b1be0c7f8 --- /dev/null +++ b/swap/chain/txqueue.go @@ -0,0 +1,515 @@ +package chain + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "strconv" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethersphere/swarm/state" +) + +// TxQueue is a TxScheduler which sends transactions in sequence +// A new transaction is only sent after the previous one confirmed +type TxQueue struct { + lock sync.Mutex // lock for the entire queue + ctx context.Context // context used for all network requests to ensure the queue can be stopped at any point + cancel context.CancelFunc // function to cancel the above context + wg sync.WaitGroup // used to ensure that all background go routines have finished before Stop returns + running bool // bool indicating that the queue is running. used to ensure it does not run multiple times simultaneously + + store state.Store // state store to use as the backend + prefix string // all keys in the state store are prefixed with this + requestQueue *PersistentQueue // queue for all future requests + handlers map[TransactionRequestTypeID]*TransactionRequestHandlers // map from request type ids to their registered handlers + notificationQueues map[TransactionRequestTypeID]*PersistentQueue // map from request type ids to the notification queue of that handler + + backend Backend // ethereum backend to use + privateKey *ecdsa.PrivateKey // private key used to sign transactions +} + +// TransactionRequestMetadata is the metadata the queue saves for every request +type TransactionRequestMetadata struct { + RequestTypeID TransactionRequestTypeID // the type id of this request + State TransactionRequestState // the state this request is in + Hash common.Hash // the hash of the associated transaction (if already sent) +} + +// NotificationQueueItem is the metadata the queue saves for every pending notification +type NotificationQueueItem struct { + NotificationType string // the type of the notification + RequestID uint64 // the request this notification is for +} + +// NewTxQueue creates a new TxQueue +func NewTxQueue(store state.Store, prefix string, backend Backend, privateKey *ecdsa.PrivateKey) *TxQueue { + txq := &TxQueue{ + store: store, + prefix: prefix, + handlers: make(map[TransactionRequestTypeID]*TransactionRequestHandlers), + notificationQueues: make(map[TransactionRequestTypeID]*PersistentQueue), + backend: backend, + privateKey: privateKey, + requestQueue: NewPersistentQueue(store, prefix+"_requestQueue_"), + } + txq.ctx, txq.cancel = context.WithCancel(context.Background()) + return txq +} + +// requestKey returns the database key for the TransactionRequestMetadata data +func (txq *TxQueue) requestKey(id uint64) string { + return txq.prefix + "_requests_" + strconv.FormatUint(id, 10) +} + +// requestDataKey returns the database key for the custom TransactionRequest +func (txq *TxQueue) requestDataKey(id uint64) string { + return txq.requestKey(id) + "_data" +} + +// activeRequestKey returns the database key used for the currently active request +func (txq *TxQueue) activeRequestKey() string { + return txq.prefix + "_active" +} + +// requestDataKey returns the database key for the custom TransactionRequest +func (txq *TxQueue) notificationKey(key string) string { + return txq.prefix + "_notification_" + key +} + +// SetHandlers registers the handlers for the given handler +// This starts the delivery of notifications for this handler +func (txq *TxQueue) SetHandlers(requestTypeID TransactionRequestTypeID, handlers *TransactionRequestHandlers) { + txq.lock.Lock() + defer txq.lock.Unlock() + + if txq.handlers[requestTypeID] != nil { + log.Error("handlers for %v.%v already set", requestTypeID.Handler, requestTypeID.RequestType) + return + } + txq.handlers[requestTypeID] = handlers + + // go routine processing the notification queue for this handler + txq.wg.Add(1) + go func() { + defer txq.wg.Done() + notifyQ := txq.getNotificationQueue(requestTypeID) + + for { + var item NotificationQueueItem + // get the next notification item + key, err := notifyQ.Next(txq.ctx, &item) + if err != nil { + return + } + + // load and decode the notification + var notification interface{} + switch item.NotificationType { + case "TransactionReceiptNotification": + notification = &TransactionReceiptNotification{} + case "TransactionStateChangedNotification": + notification = &TransactionStateChangedNotification{} + } + + err = txq.store.Get(txq.notificationKey(key), notification) + if err != nil { + return + } + + switch item.NotificationType { + case "TransactionReceiptNotification": + if handlers.NotifyReceipt != nil { + err = handlers.NotifyReceipt(item.RequestID, notification.(*TransactionReceiptNotification)) + } + case "TransactionStateChangedNotification": + if handlers.NotifyStateChanged != nil { + err = handlers.NotifyStateChanged(item.RequestID, notification.(*TransactionStateChangedNotification)) + } + } + if err != nil { + select { + case <-txq.ctx.Done(): + return + case <-time.After(10 * time.Second): + continue + } + } + + // once the notification was handled delete it from the queue + batch := new(state.StoreBatch) + notifyQ.Delete(batch, key) + err = txq.store.WriteBatch(batch) + if err != nil { + return + } + } + }() +} + +// ScheduleRequest adds a new request to be processed +// The request is assigned an id which is returned +func (txq *TxQueue) ScheduleRequest(requestTypeID TransactionRequestTypeID, request interface{}) (id uint64, err error) { + txq.lock.Lock() + defer txq.lock.Unlock() + + // get the last id + idKey := txq.prefix + "_request_id" + err = txq.store.Get(idKey, &id) + if err != nil && err != state.ErrNotFound { + return 0, err + } + // ids start at 1 + id++ + + // in a single batch we + // * store the request data + // * store the request metadata + // * add it to the queue + batch := new(state.StoreBatch) + batch.Put(idKey, id) + if err := batch.Put(txq.requestDataKey(id), request); err != nil { + return 0, err + } + + err = batch.Put(txq.requestKey(id), &TransactionRequestMetadata{ + RequestTypeID: requestTypeID, + State: TransactionQueued, + }) + if err != nil { + return 0, err + } + + _, triggerQueue, err := txq.requestQueue.Queue(batch, id) + if err != nil { + return 0, err + } + + // persist to disk + err = txq.store.WriteBatch(batch) + if err != nil { + return 0, err + } + + triggerQueue() + + return id, nil +} + +// GetRequest load the serialized transaction request from disk and tries to decode it +func (txq *TxQueue) GetRequest(id uint64, request interface{}) error { + return txq.store.Get(txq.requestDataKey(id), &request) +} + +// Start starts processing transactions if it is not already doing so +func (txq *TxQueue) Start() { + txq.lock.Lock() + defer txq.lock.Unlock() + + if txq.running { + return + } + + txq.running = true + txq.wg.Add(1) + go func() { + err := txq.loop() + if err != nil && !errors.Is(err, context.Canceled) { + log.Error("transaction queue terminated with an error", "error", err) + } + txq.wg.Done() + }() +} + +// Stop stops processing transactions if it is running +// It will block until processing has terminated +func (txq *TxQueue) Stop() { + txq.lock.Lock() + + if !txq.running { + return + } + + txq.running = false + txq.cancel() + + txq.lock.Unlock() + // wait until all routines have finished + txq.wg.Wait() +} + +// getNotificationQueue gets the notification queue for a handler +// it initializes the struct if it does not yet exist +// the TxQueue lock must not be held +func (txq *TxQueue) getNotificationQueue(requestTypeID TransactionRequestTypeID) *PersistentQueue { + txq.lock.Lock() + defer txq.lock.Unlock() + queue, ok := txq.notificationQueues[requestTypeID] + if !ok { + queue = NewPersistentQueue(txq.store, txq.prefix+"_notify_"+requestTypeID.Handler+"_"+requestTypeID.RequestType) + txq.notificationQueues[requestTypeID] = queue + } + return queue +} + +// updateRequestStatus is a helper function to change the state of a transaction request while also emitting a notification +// in one write batch it +// * adds a TransactionStateChangedNotification notification to the notification queue +// * stores the corresponding notification +// * updates the state of requestMetadata and persists it +// it returns the trigger function which must be called once the batch was written +// this only returns an error if the encoding fails which is an unrecoverable error +func (txq *TxQueue) updateRequestStatus(batch *state.StoreBatch, id uint64, requestMetadata *TransactionRequestMetadata, newState TransactionRequestState) (finish func(), err error) { + nQueue := txq.getNotificationQueue(requestMetadata.RequestTypeID) + key, triggerQueue, err := nQueue.Queue(batch, &NotificationQueueItem{ + RequestID: id, + NotificationType: "TransactionStateChangedNotification", + }) + if err != nil { + return nil, fmt.Errorf("could not serialize notification queue item (this is a bug): %v", err) + } + + err = batch.Put(txq.notificationKey(key), &TransactionStateChangedNotification{ + OldState: requestMetadata.State, + NewState: newState, + }) + if err != nil { + return nil, fmt.Errorf("could not serialize notification (this is a bug): %v", err) + } + requestMetadata.State = newState + batch.Put(txq.requestKey(id), requestMetadata) + + return triggerQueue, nil +} + +// loop is the main transaction processing function of the TxQueue +// first it checks if there already is an active request. If so it processes this first +// then it will take request from the queue in a loop and execute those +func (txq *TxQueue) loop() error { + // get the stored active request key + // if nothing is stored id will remain 0 (which is invalid as ids start with 1) + var id uint64 + err := txq.store.Get(txq.activeRequestKey(), &id) + if err != state.ErrNotFound { + return err + } + + // if there is a non-zero id there is an active request + if id != 0 { + log.Info("Continuing to monitor previous transaction") + // load the request metadata + var requestMetadata TransactionRequestMetadata + err = txq.store.Get(txq.requestKey(id), &requestMetadata) + if err != nil { + return err + } + + switch requestMetadata.State { + // if the transaction is still in the Queued state we cannot know for sure where the process terminated + // with a very high likelihood the transaction was not yet sent, but we cannot be sure of that + // the transaction is marked as TransactionStatusUnknown and removed as the active transaction + // in that rare case nonce issue might arise in subsequent requests + case TransactionQueued: + batch := new(state.StoreBatch) + finish, err := txq.updateRequestStatus(batch, id, &requestMetadata, TransactionStatusUnknown) + // this only returns an error if the encoding fails which is an unrecoverable error + if err != nil { + return err + } + batch.Delete(txq.activeRequestKey()) + if err := txq.store.WriteBatch(batch); err != nil { + return err + } + finish() + // if the transaction is in the pending state this means we were previously waiting for the transaction + case TransactionPending: + // this only returns an error if the encoding fails which is an unrecoverable error + if err := txq.waitForActiveTransaction(id, &requestMetadata); err != nil { + return err + } + default: + // this indicates a client bug + log.Error("found active transaction in unexpected state") + if err := txq.store.Delete(txq.activeRequestKey()); err != nil { + return err + } + } + } +l: + for txq.running { + // terminate the loop if the context was cancelled + select { + case <-txq.ctx.Done(): + break l + default: + } + + // get the id of the next request in the queue + var id uint64 + key, err := txq.requestQueue.Next(txq.ctx, &id) + if err != nil { + return err + } + + // load the request metadata + var requestMetadata TransactionRequestMetadata + err = txq.store.Get(txq.requestKey(id), &requestMetadata) + if err != nil { + return err + } + + txq.lock.Lock() + handlers := txq.handlers[requestMetadata.RequestTypeID] + txq.lock.Unlock() + if handlers == nil { + // if there is no handler for this handler available we mark the request as cancelled and remove it from the queue + batch := new(state.StoreBatch) + finish, err := txq.updateRequestStatus(batch, id, &requestMetadata, TransactionCancelled) + // this only returns an error if the encoding fails which is an unrecoverable error + if err != nil { + return err + } + + txq.requestQueue.Delete(batch, key) + err = txq.store.WriteBatch(batch) + if err != nil { + return err + } + + finish() + continue l + } + + // if the request was successfully decoded it is removed from the queue and set as the active request + batch := new(state.StoreBatch) + err = batch.Put(txq.activeRequestKey(), id) + // this only returns an error if the encoding fails which is an unrecoverable error + if err != nil { + return fmt.Errorf("could not put id write into batch (this is a bug): %v", err) + } + + txq.requestQueue.Delete(batch, key) + if err = txq.store.WriteBatch(batch); err != nil { + return err + } + + // finally we call the handler to send the actual transaction + opts := bind.NewKeyedTransactor(txq.privateKey) + hash, err := handlers.Send(id, txq.backend, opts) + if err != nil { + // even if SendTransactionRequest returns an error there are still certain rare edge cases where the transaction might still be sent so we mark it as status unknown + batch := new(state.StoreBatch) + finish, err := txq.updateRequestStatus(batch, id, &requestMetadata, TransactionStatusUnknown) + if err != nil { + // this only returns an error if the encoding fails which is an unrecoverable error + return fmt.Errorf("failed to write transaction request status to store: %v", err) + } + + if err = txq.store.WriteBatch(batch); err != nil { + return err + } + finish() + continue l + } + + // if we have a hash we mark the transaction as pending + batch = new(state.StoreBatch) + requestMetadata.Hash = hash + finish, err := txq.updateRequestStatus(batch, id, &requestMetadata, TransactionPending) + if err != nil { + // this only returns an error if the encoding fails which is an unrecoverable error + return fmt.Errorf("failed to write transaction request status to store: %v", err) + } + + if err = txq.store.WriteBatch(batch); err != nil { + return err + } + + finish() + + err = txq.waitForActiveTransaction(id, &requestMetadata) + if err != nil { + // this only returns an error if the encoding fails which is an unrecoverable error + return fmt.Errorf("error while waiting for transaction: %v", err) + } + } + return nil +} + +// waitForActiveTransaction waits for requestMetadata to be mined and resets the active transaction afterwards +// the transaction will also be considered mine once the notification was queued successfully +// this only returns an error if the encoding fails which is an unrecoverable error +func (txq *TxQueue) waitForActiveTransaction(id uint64, requestMetadata *TransactionRequestMetadata) error { + ctx, cancel := context.WithTimeout(txq.ctx, 20*time.Minute) + defer cancel() + + // an error here means the context was cancelled + receipt, err := WaitMined(ctx, txq.backend, requestMetadata.Hash) + if err != nil { + // if the main context of the TxQueue was cancelled we log and return + if txq.ctx.Err() != nil { + log.Warn("terminating transaction queue while waiting for a transaction") + return nil + } + + // if the timeout context expired we mark the transaction status as unknown + // future versions of the queue (with local nonce-tracking) should keep note of that and reuse the nonce for the next request + log.Error("transaction timeout reached") + batch := new(state.StoreBatch) + finish, err := txq.updateRequestStatus(batch, id, requestMetadata, TransactionStatusUnknown) + if err != nil { + return err + } + + if err = txq.store.WriteBatch(batch); err != nil { + return err + } + + finish() + return nil + } + + // if the transaction is mined we need to + // * update the request state and emit the corresponding notification + // * emit a TransactionReceiptNotification + // * reset the active request + notifyQueue := txq.getNotificationQueue(requestMetadata.RequestTypeID) + batch := new(state.StoreBatch) + triggerRequestQueue, err := txq.updateRequestStatus(batch, id, requestMetadata, TransactionConfirmed) + if err != nil { + // unrecoverable decode error + return err + } + + key, triggerNotifyQueue, err := notifyQueue.Queue(batch, &NotificationQueueItem{ + RequestID: id, + NotificationType: "TransactionReceiptNotification", + }) + if err != nil { + // unrecoverable decode error + return err + } + + batch.Put(txq.notificationKey(key), &TransactionReceiptNotification{ + Receipt: *receipt, + }) + + err = batch.Put(txq.activeRequestKey(), nil) + if err != nil { + return err + } + + if err = txq.store.WriteBatch(batch); err != nil { + return err + } + + triggerRequestQueue() + triggerNotifyQueue() + return nil +} diff --git a/swap/chain/txqueue_test.go b/swap/chain/txqueue_test.go new file mode 100644 index 0000000000..178ee0f936 --- /dev/null +++ b/swap/chain/txqueue_test.go @@ -0,0 +1,317 @@ +package chain + +import ( + "context" + "errors" + "fmt" + "math/big" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/accounts/abi/bind/backends" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethersphere/swarm/state" + mock "github.com/ethersphere/swarm/swap/chain/mock" + "github.com/ethersphere/swarm/testutil" +) + +func init() { + testutil.Init() +} + +var ( + senderKey, _ = crypto.HexToECDSA("634fb5a872396d9693e5c9f9d7233cfa93f395c093371017ff44aa9ae6564cdd") + senderAddress = crypto.PubkeyToAddress(senderKey.PublicKey) +) + +var defaultBackend = backends.NewSimulatedBackend(core.GenesisAlloc{ + senderAddress: {Balance: big.NewInt(1000000000000000000)}, +}, 8000000) + +func newTestBackend() *mock.TestBackend { + return mock.NewTestBackend(defaultBackend) +} + +var TestRequestTypeID = TransactionRequestTypeID{ + Handler: "test", + RequestType: "TestRequest", +} + +type TestRequest struct { + Value uint64 +} + +type TxSchedulerTester struct { + lock sync.Mutex + chans map[uint64]*TxSchedulerTesterRequest + backend Backend +} + +type TxSchedulerTesterRequest struct { + ReceiptNotification chan *TransactionReceiptNotification + StateChangedNotification chan *TransactionStateChangedNotification + hash common.Hash +} + +func newTxSchedulerTester(backend Backend, txq TxScheduler) *TxSchedulerTester { + t := &TxSchedulerTester{ + backend: backend, + chans: make(map[uint64]*TxSchedulerTesterRequest), + } + txq.SetHandlers(TestRequestTypeID, &TransactionRequestHandlers{ + Send: t.SendTransactionRequest, + NotifyReceipt: func(id uint64, notification *TransactionReceiptNotification) error { + t.getRequest(id).ReceiptNotification <- notification + return nil + }, + NotifyStateChanged: func(id uint64, notification *TransactionStateChangedNotification) error { + t.getRequest(id).StateChangedNotification <- notification + return nil + }, + }) + return t +} + +func (tc *TxSchedulerTester) getRequest(id uint64) *TxSchedulerTesterRequest { + tc.lock.Lock() + defer tc.lock.Unlock() + c, ok := tc.chans[id] + if !ok { + tc.chans[id] = &TxSchedulerTesterRequest{ + ReceiptNotification: make(chan *TransactionReceiptNotification), + StateChangedNotification: make(chan *TransactionStateChangedNotification), + } + return tc.chans[id] + } + return c +} + +func (tc *TxSchedulerTester) expectStateChangedNotification(ctx context.Context, id uint64, oldState TransactionRequestState, newState TransactionRequestState) error { + var notification *TransactionStateChangedNotification + select { + case notification = <-tc.getRequest(id).StateChangedNotification: + case <-ctx.Done(): + return ctx.Err() + } + + if notification.OldState != oldState { + return fmt.Errorf("wrong old state. got %v, expected %v", notification.OldState, oldState) + } + + if notification.NewState != newState { + return fmt.Errorf("wrong new state. got %v, expected %v", notification.NewState, newState) + } + + return nil +} + +func (tc *TxSchedulerTester) expectReceiptNotification(ctx context.Context, id uint64) error { + var notification *TransactionReceiptNotification + request := tc.getRequest(id) + select { + case notification = <-request.ReceiptNotification: + case <-ctx.Done(): + return ctx.Err() + } + + receipt, err := tc.backend.TransactionReceipt(context.Background(), request.hash) + if err != nil { + return err + } + if receipt == nil { + return errors.New("no receipt found for transaction") + } + + if notification.Receipt.TxHash != request.hash { + return fmt.Errorf("wrong old state. got %v, expected %v", notification.Receipt.TxHash, request.hash) + } + + return nil +} + +func (tc *TxSchedulerTester) SendTransactionRequest(id uint64, backend Backend, opts *bind.TransactOpts) (hash common.Hash, err error) { + var nonce uint64 + if opts.Nonce == nil { + nonce, err = backend.PendingNonceAt(opts.Context, opts.From) + if err != nil { + return common.Hash{}, err + } + } else { + nonce = opts.Nonce.Uint64() + } + + signed, err := opts.Signer(types.HomesteadSigner{}, opts.From, types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 100000, big.NewInt(int64(10000000)), []byte{})) + if err != nil { + return common.Hash{}, err + } + err = backend.SendTransaction(opts.Context, signed) + if err != nil { + return common.Hash{}, err + } + tc.getRequest(id).hash = signed.Hash() + return signed.Hash(), nil +} + +func setupTxQueueTest(run bool) (*TxQueue, func()) { + backend := newTestBackend() + store := state.NewInmemoryStore() + txq := NewTxQueue(store, "test", backend, senderKey) + if run { + txq.Start() + } + return txq, func() { + if run { + txq.Stop() + } + store.Close() + backend.Close() + } +} + +func TestNewPersistentQueue(t *testing.T) { + store := state.NewInmemoryStore() + defer store.Close() + + queue := NewPersistentQueue(store, "testq") + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + var wg sync.WaitGroup + wg.Add(2) + + var errout error + var keys [10]string + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + var value uint64 + key, err := queue.Next(ctx, &value) + if err != nil { + errout = fmt.Errorf("failed to get next item: %v", err) + return + } + + if key != keys[i] { + errout = fmt.Errorf("keys don't match: got %v, expected %v", key, keys[i]) + return + } + + if value != uint64(i) { + errout = fmt.Errorf("values don't match: got %v, expected %v", value, i) + return + } + + batch := new(state.StoreBatch) + queue.Delete(batch, key) + err = store.WriteBatch(batch) + if err != nil { + errout = fmt.Errorf("could not write batch: %v", err) + return + } + } + }() + + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + var value = uint64(i) + batch := new(state.StoreBatch) + key, trigger, err := queue.Queue(batch, value) + keys[i] = key + if err != nil { + errout = fmt.Errorf("failed to queue item: %v", err) + return + } + err = store.WriteBatch(batch) + if err != nil { + errout = fmt.Errorf("failed to write batch: %v", err) + return + } + + trigger() + } + }() + + wg.Wait() + + if errout != nil { + t.Fatal(errout) + } +} + +func TestTxQueue(t *testing.T) { + txq, clean := setupTxQueueTest(false) + defer clean() + tc := newTxSchedulerTester(txq.backend, txq) + + testRequest := &TestRequest{ + Value: 100, + } + + id, err := txq.ScheduleRequest(TestRequestTypeID, testRequest) + if err != nil { + t.Fatal(err) + } + + if id != 1 { + t.Fatal("expected id to be 1") + } + + txq.Start() + defer txq.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + if err = tc.expectStateChangedNotification(ctx, id, TransactionQueued, TransactionPending); err != nil { + t.Fatal(err) + } + + if err = tc.expectStateChangedNotification(ctx, id, TransactionPending, TransactionConfirmed); err != nil { + t.Fatal(err) + } + + if err = tc.expectReceiptNotification(ctx, id); err != nil { + t.Fatal(err) + } +} + +func TestTxQueueManyRequests(t *testing.T) { + txq, clean := setupTxQueueTest(true) + defer clean() + tc := newTxSchedulerTester(txq.backend, txq) + + var ids []uint64 + count := 200 + for i := 0; i < count; i++ { + id, err := txq.ScheduleRequest(TestRequestTypeID, 5) + if err != nil { + t.Fatal(err) + } + + ids = append(ids, id) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + for _, id := range ids { + err := tc.expectStateChangedNotification(ctx, id, TransactionQueued, TransactionPending) + if err != nil { + t.Fatal(err) + } + err = tc.expectStateChangedNotification(ctx, id, TransactionPending, TransactionConfirmed) + if err != nil { + t.Fatal(err) + } + err = tc.expectReceiptNotification(ctx, id) + if err != nil { + t.Fatal(err) + } + } +} diff --git a/swap/chain/txscheduler.go b/swap/chain/txscheduler.go new file mode 100644 index 0000000000..a2393af620 --- /dev/null +++ b/swap/chain/txscheduler.go @@ -0,0 +1,76 @@ +package chain + +import ( + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// TxScheduler represents a central sender for all transactions from a single ethereum account +// its purpose is to ensure there are no nonce issues and that transaction initiators are notified of the result +// notifications are guaranteed to happen even across restarts and disconnects from the ethereum backend +type TxScheduler interface { + // SetHandlers registers the handlers for the given requestTypeID + // This starts the delivery of notifications for this requestTypeID + SetHandlers(requestTypeID TransactionRequestTypeID, handlers *TransactionRequestHandlers) + // ScheduleRequest adds a new request to be processed + // The request is assigned an id which is returned + ScheduleRequest(requestTypeID TransactionRequestTypeID, request interface{}) (id uint64, err error) + // GetRequest load the serialized transaction request from disk and tries to decode it + GetRequest(id uint64, request interface{}) error + // Start starts processing transactions if it is not already doing so + Start() + // Stop stops processing transactions if it is running + // It will block until processing has terminated + Stop() +} + +// TransactionRequestTypeID is a combination of a handler and a request type +// All requests with a given TransactionRequestTypeID are handled the same +type TransactionRequestTypeID struct { + Handler string + RequestType string +} + +// TransactionRequestHandlers holds all the callbacks for a given TransactionRequestTypeID +// Any of the functions may be nil +type TransactionRequestHandlers struct { + // Send should send the transaction using the backend and opts provided + // opts may be modified, however From, Nonce and Signer must be left untouched + // if the transaction is sent through other means opts.Nonce must be respected (if set to nil, the "pending" nonce must be used) + Send func(id uint64, backend Backend, opts *bind.TransactOpts) (common.Hash, error) + // Notify functions are called by the transaction queue when a notification for a transaction occurs. + // If the handler returns an error the notification will be resent in the future (including across restarts) + // NotifyReceipt is called the first time a receipt is observed for a transaction + NotifyReceipt func(id uint64, notification *TransactionReceiptNotification) error + // NotifyStateChanged is called every time the transaction status changes + NotifyStateChanged func(id uint64, notification *TransactionStateChangedNotification) error +} + +// TransactionRequestState is the type used to indicate which state the transaction is in +type TransactionRequestState uint8 + +// TransactionReceiptNotification is the notification emitted when the receipt is available +type TransactionReceiptNotification struct { + Receipt types.Receipt // the receipt of the included transaction +} + +// TransactionStateChangedNotification is the notification emitted when the state of the request changes +// Note: by the time the handler processes the notification, the state might have already changed again +type TransactionStateChangedNotification struct { + OldState TransactionRequestState // the state prior to the change + NewState TransactionRequestState // the state after the change +} + +const ( + // TransactionQueued is the initial state for all requests that enter the queue + TransactionQueued TransactionRequestState = 0 + // TransactionPending means the request is no longer in the queue but not yet confirmed + TransactionPending TransactionRequestState = 1 + // TransactionConfirmed is entered the first time a confirmation is received. This is a terminal state. + TransactionConfirmed TransactionRequestState = 2 + // TransactionStatusUnknown is used for all cases where it is unclear wether the transaction was broadcast or not. This is also used for timed-out transactions. + TransactionStatusUnknown TransactionRequestState = 3 + // TransactionCancelled is used for all cases where it is certain the transaction was and will never be sent + TransactionCancelled TransactionRequestState = 4 +) diff --git a/swap/common_test.go b/swap/common_test.go index 9fb2ca6343..e774fa50c8 100644 --- a/swap/common_test.go +++ b/swap/common_test.go @@ -34,8 +34,6 @@ type swapTestBackend struct { *mock.TestBackend factoryAddress common.Address // address of the SimpleSwapFactory in the simulated network tokenAddress common.Address // address of the token in the simulated network - // the async cashing go routine needs synchronization for tests - cashDone chan struct{} } var defaultBackend = backends.NewSimulatedBackend(core.GenesisAlloc{ @@ -104,7 +102,9 @@ func newBaseTestSwapWithParams(t *testing.T, key *ecdsa.PrivateKey, params *Para if err != nil { t.Fatal(err) } - swap := newSwapInstance(stateStore, owner, backend, 10, params, factory) + + txqueue := chain.NewTxQueue(stateStore, "chain", backend, owner.privateKey) + swap := newSwapInstance(stateStore, owner, backend, 10, params, factory, txqueue) return swap, dir } @@ -125,6 +125,7 @@ func newTestSwap(t *testing.T, key *ecdsa.PrivateKey, backend *swapTestBackend) usedBackend = newTestBackend(t) } swap, dir := newBaseTestSwap(t, key, usedBackend) + swap.txScheduler.Start() clean := func() { swap.Close() // only close if created by newTestSwap to avoid double close @@ -205,32 +206,6 @@ func newRandomTestCheque() *Cheque { return cheque } -// During tests, because the cashing in of cheques is async, we should wait for the function to be returned -// Otherwise if we call `handleEmitChequeMsg` manually, it will return before the TX has been committed to the `SimulatedBackend`, -// causing subsequent TX to possibly fail due to nonce mismatch -func testCashCheque(s *Swap, cheque *Cheque) { - cashCheque(s, cheque) - // send to the channel, signals to clients that this function actually finished - if stb, ok := s.backend.(*swapTestBackend); ok { - if stb.cashDone != nil { - stb.cashDone <- struct{}{} - } - } -} - -// setupContractTest is a helper function for setting up the -// blockchain wait function for testing -func setupContractTest() func() { - // we also need to store the previous cashCheque function in case this is called multiple times - currentCashCheque := defaultCashCheque - defaultCashCheque = testCashCheque - // overwrite only for the duration of the test, so... - return func() { - // ...we need to set it back to original when done - defaultCashCheque = currentCashCheque - } -} - // deploy for testing (needs simulated backend commit) func testDeployWithPrivateKey(ctx context.Context, backend chain.Backend, privateKey *ecdsa.PrivateKey, ownerAddress common.Address, depositAmount *uint256.Uint256) (cswap.Contract, error) { opts := bind.NewKeyedTransactor(privateKey) @@ -247,9 +222,6 @@ func testDeployWithPrivateKey(ctx context.Context, backend chain.Backend, privat return nil, err } - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() contract, err := factory.DeploySimpleSwap(opts, ownerAddress, big.NewInt(int64(defaultHarddepositTimeoutDuration))) if err != nil { return nil, err diff --git a/swap/protocol_test.go b/swap/protocol_test.go index 1b3ad93764..a61dbbf639 100644 --- a/swap/protocol_test.go +++ b/swap/protocol_test.go @@ -233,11 +233,9 @@ func TestEmitCheque(t *testing.T) { debitorSwap, cleanDebitorSwap := newTestSwap(t, beneficiaryKey, testBackend) defer cleanDebitorSwap() - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() - // now we need to create the channel... - testBackend.cashDone = make(chan struct{}) + cashChequeDone := make(chan *CashoutRequest) + defer close(cashChequeDone) + creditorSwap.cashoutProcessor.setCashoutDoneChan(cashChequeDone) log.Debug("deploy to simulated backend") @@ -319,7 +317,7 @@ func TestEmitCheque(t *testing.T) { // we wait until the cashCheque is actually terminated (ensures proper nounce count) select { - case <-creditorSwap.backend.(*swapTestBackend).cashDone: + case <-cashChequeDone: log.Debug("cash transaction completed and committed") case <-time.After(4 * time.Second): t.Fatalf("Timeout waiting for cash transaction to complete") @@ -342,9 +340,6 @@ func TestTriggerPaymentThreshold(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() if err = protocolTester.testHandshake( correctSwapHandshakeMsg(debitorSwap), diff --git a/swap/simulations_test.go b/swap/simulations_test.go index 8a180939ce..0281deaa5c 100644 --- a/swap/simulations_test.go +++ b/swap/simulations_test.go @@ -47,6 +47,7 @@ import ( "github.com/ethersphere/swarm/network/simulation" "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/state" + "github.com/ethersphere/swarm/swap/chain" mock "github.com/ethersphere/swarm/swap/chain/mock" "github.com/ethersphere/swarm/uint256" ) @@ -62,6 +63,7 @@ For integration tests, run test cluster deployments with all integration moduele (blockchains, oracles, etc.) */ // swapSimulationParams allows to avoid global variables for the test + type swapSimulationParams struct { swaps map[int]*Swap dirs map[int]string @@ -165,9 +167,10 @@ func newSimServiceMap(params *swapSimulationParams) map[string]simulation.Servic if err != nil { return nil, nil, err } + ts.swap.cashoutProcessor.txScheduler.Start() cleanup = func() { - ts.swap.store.Close() + ts.swap.Close() os.RemoveAll(dir) } @@ -244,7 +247,8 @@ func newSharedBackendSwaps(t *testing.T, nodeCount int) (*swapSimulationParams, if err != nil { t.Fatal(err) } - params.swaps[i] = newSwapInstance(stores[i], owner, testBackend, 10, defParams, factory) + txqueue := chain.NewTxQueue(stores[i], "chain", testBackend, owner.privateKey) + params.swaps[i] = newSwapInstance(stores[i], owner, testBackend, 10, defParams, factory, txqueue) } params.backend = testBackend @@ -266,12 +270,6 @@ func TestMultiChequeSimulation(t *testing.T) { // cleanup backend defer params.backend.Close() - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() - - params.backend.cashDone = make(chan struct{}, 1) - defer close(params.backend.cashDone) // initialize the simulation sim := simulation.NewBzzInProc(newSimServiceMap(params), false) defer sim.Close() @@ -303,6 +301,10 @@ func TestMultiChequeSimulation(t *testing.T) { // get the testService for the creditor creditorSvc := sim.Service("swap", creditor).(*testService) + cashChequeDone := make(chan *CashoutRequest, 1) + defer close(cashChequeDone) + creditorSvc.swap.cashoutProcessor.setCashoutDoneChan(cashChequeDone) + var debLen, credLen, debSwapLen, credSwapLen int timeout := time.After(10 * time.Second) for { @@ -377,7 +379,7 @@ func TestMultiChequeSimulation(t *testing.T) { balanceAfterMessage := debitorBalance - int64(msgPrice) if balanceAfterMessage <= -paymentThreshold { // we need to wait a bit in order to give time for the cheque to be processed - if err := waitForChequeProcessed(t, params.backend, counter, lastCount, debitorSvc.swap.peers[creditor], expectedPayout); err != nil { + if err := waitForChequeProcessed(t, params.backend, counter, lastCount, debitorSvc.swap.peers[creditor], expectedPayout, cashChequeDone); err != nil { t.Fatal(err) } expectedPayout += uint64(-balanceAfterMessage) @@ -613,7 +615,7 @@ func TestBasicSwapSimulation(t *testing.T) { log.Info("Simulation ended") } -func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metrics.Counter, lastCount int64, p *Peer, expectedLastPayout uint64) error { +func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metrics.Counter, lastCount int64, p *Peer, expectedLastPayout uint64, cashChequeDone <-chan *CashoutRequest) error { t.Helper() ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() @@ -636,7 +638,7 @@ func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metr lock.Unlock() wg.Done() return - case <-backend.cashDone: + case <-cashChequeDone: wg.Done() return } diff --git a/swap/swap.go b/swap/swap.go index 1577907343..3850ac6e2a 100644 --- a/swap/swap.go +++ b/swap/swap.go @@ -68,6 +68,7 @@ type Swap struct { chequebookFactory contract.SimpleSwapFactory // the chequebook factory used honeyPriceOracle HoneyOracle // oracle which resolves the price of honey (in Wei) cashoutProcessor *CashoutProcessor // processor for cashing out + txScheduler chain.TxScheduler // transaction scheduler to use } // Owner encapsulates information related to accessing the contract @@ -136,7 +137,7 @@ func swapRotatingFileHandler(logdir string) (log.Handler, error) { } // newSwapInstance is a swap constructor function without integrity checks -func newSwapInstance(stateStore state.Store, owner *Owner, backend chain.Backend, chainID uint64, params *Params, chequebookFactory contract.SimpleSwapFactory) *Swap { +func newSwapInstance(stateStore state.Store, owner *Owner, backend chain.Backend, chainID uint64, params *Params, chequebookFactory contract.SimpleSwapFactory, txScheduler chain.TxScheduler) *Swap { return &Swap{ store: stateStore, peers: make(map[enode.ID]*Peer), @@ -146,7 +147,8 @@ func newSwapInstance(stateStore state.Store, owner *Owner, backend chain.Backend chequebookFactory: chequebookFactory, honeyPriceOracle: NewHoneyPriceOracle(), chainID: chainID, - cashoutProcessor: newCashoutProcessor(backend, owner.privateKey), + cashoutProcessor: newCashoutProcessor(txScheduler, backend, owner.privateKey), + txScheduler: txScheduler, } } @@ -209,12 +211,15 @@ func New(dbPath string, prvkey *ecdsa.PrivateKey, backendURL string, params *Par chainID.Uint64(), params, factory, + chain.NewTxQueue(stateStore, "chain", backend, owner.privateKey), ) // start the chequebook if swap.contract, err = swap.StartChequebook(chequebookAddressFlag); err != nil { return nil, err } + swap.txScheduler.Start() + // deposit money in the chequebook if desired if !skipDepositFlag { // prompt the user for a depositAmount @@ -391,8 +396,6 @@ func (s *Swap) handleMsg(p *Peer) func(ctx context.Context, msg interface{}) err } } -var defaultCashCheque = cashCheque - // handleEmitChequeMsg should be handled by the creditor when it receives // a cheque from a debitor func (s *Swap) handleEmitChequeMsg(ctx context.Context, p *Peer, msg *EmitChequeMsg) error { @@ -435,21 +438,10 @@ func (s *Swap) handleEmitChequeMsg(ctx context.Context, p *Peer, msg *EmitCheque return protocols.Break(err) } - expectedPayout, transactionCosts, err := s.cashoutProcessor.estimatePayout(context.TODO(), cheque) - if err != nil { - return protocols.Break(err) - } - - costsMultiplier := uint256.FromUint64(2) - costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier) - if err != nil { - return err - } - - // do a payout transaction if we get 2 times the gas costs - if expectedPayout.Cmp(costThreshold) == 1 { - go defaultCashCheque(s, cheque) - } + go s.cashoutProcessor.submitCheque(&CashoutRequest{ + Cheque: *cheque, + Destination: s.GetParams().ContractAddress, + }) return nil } @@ -489,20 +481,6 @@ func (s *Swap) handleConfirmChequeMsg(ctx context.Context, p *Peer, msg *Confirm return nil } -// cashCheque should be called async as it blocks until the transaction(s) are mined -// The function cashes the cheque by sending it to the blockchain -func cashCheque(s *Swap, cheque *Cheque) { - err := s.cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{ - Cheque: *cheque, - Destination: s.GetParams().ContractAddress, - }) - - if err != nil { - metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1) - swapLog.Error("cashing cheque:", "error", err) - } -} - // processAndVerifyCheque verifies the cheque and compares it with the last received cheque // if the cheque is valid it will also be saved as the new last cheque func (s *Swap) processAndVerifyCheque(cheque *Cheque, p *Peer) (*uint256.Uint256, error) { @@ -612,6 +590,7 @@ func (s *Swap) saveBalance(p enode.ID, balance int64) error { // Close cleans up swap func (s *Swap) Close() error { + s.txScheduler.Stop() return s.store.Close() } diff --git a/swap/swap_test.go b/swap/swap_test.go index cc0656bdb1..e3b786cf3b 100644 --- a/swap/swap_test.go +++ b/swap/swap_test.go @@ -641,10 +641,6 @@ func TestResetBalance(t *testing.T) { t.Fatal(err) } - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() - // now simulate sending the cheque to the creditor from the debitor if err = creditor.sendCheque(); err != nil { t.Fatal(err) @@ -667,8 +663,11 @@ func TestResetBalance(t *testing.T) { msg := &EmitChequeMsg{ Cheque: cheque, } - // now we need to create the channel... - testBackend.cashDone = make(chan struct{}) + + cashChequeDone := make(chan *CashoutRequest) + defer close(cashChequeDone) + creditorSwap.cashoutProcessor.setCashoutDoneChan(cashChequeDone) + // ...and trigger message handling on the receiver side (creditor) // remember that debitor is the model of the remote node for the creditor... err = creditorSwap.handleEmitChequeMsg(ctx, debitor, msg) @@ -677,7 +676,7 @@ func TestResetBalance(t *testing.T) { } // ...on which we wait until the cashCheque is actually terminated (ensures proper nounce count) select { - case <-testBackend.cashDone: + case <-cashChequeDone: log.Debug("cash transaction completed and committed") case <-time.After(4 * time.Second): t.Fatalf("Timeout waiting for cash transactions to complete") @@ -1294,10 +1293,6 @@ func TestSwapLogToFile(t *testing.T) { t.Fatal(err) } - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() - // now simulate sending the cheque to the creditor from the debitor if err = creditor.sendCheque(); err != nil { t.Fatal(err) @@ -1351,8 +1346,6 @@ func TestAvailableBalance(t *testing.T) { defer testBackend.Close() swap, clean := newTestSwap(t, ownerKey, testBackend) defer clean() - cleanup := setupContractTest() - defer cleanup() depositAmount := uint256.FromUint64(9000 * RetrieveRequestPrice)