From 422ae584562511ab3ddd7922bf5120a6719dcf02 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Tue, 3 Mar 2020 19:22:37 +0100 Subject: [PATCH 01/18] swap/chain: add persistent queue --- swap/chain/common_test.go | 7 ++ swap/chain/persistentqueue.go | 121 +++++++++++++++++++++++++++++ swap/chain/persistentqueue_test.go | 93 ++++++++++++++++++++++ 3 files changed, 221 insertions(+) create mode 100644 swap/chain/common_test.go create mode 100644 swap/chain/persistentqueue.go create mode 100644 swap/chain/persistentqueue_test.go diff --git a/swap/chain/common_test.go b/swap/chain/common_test.go new file mode 100644 index 0000000000..38d9109af3 --- /dev/null +++ b/swap/chain/common_test.go @@ -0,0 +1,7 @@ +package chain + +import "github.com/ethersphere/swarm/testutil" + +func init() { + testutil.Init() +} diff --git a/swap/chain/persistentqueue.go b/swap/chain/persistentqueue.go new file mode 100644 index 0000000000..e6695f2f19 --- /dev/null +++ b/swap/chain/persistentqueue.go @@ -0,0 +1,121 @@ +package chain + +import ( + "context" + "encoding" + "encoding/json" + "fmt" + "strings" + "sync" + "time" + + "github.com/ethersphere/swarm/state" +) + +/* + PersistentQueue represents a queue stored in a state store + Items are enqueued by writing them to the state store with the timestamp as prefix and a nonce in case two items get queued at the same time + It provides a (blocking) Next function to wait for a new item to be available. Only a single call to Next may be active at any time + To allow atomic operations with other state store operations all functions only write to batches instead of writing to the store directly + The user must ensure that all functions (except Next) are called with the same lock held which is provided externally so multiple queues can use the same + The queue provides no dequeue function. Instead an item must be deleted by its key +*/ + +// PersistentQueue represents a queue stored in a state store +type PersistentQueue struct { + store state.Store // the store backing this queue + prefix string // the prefix for the keys for this queue + trigger chan struct{} // channel to notify the queue that a new item is available + nonce uint64 // increasing nonce. starts with 0 on every startup +} + +// NewPersistentQueue creates a structure to interact with a queue with the given prefix +func NewPersistentQueue(store state.Store, prefix string) *PersistentQueue { + return &PersistentQueue{ + store: store, + prefix: prefix, + trigger: make(chan struct{}, 1), + nonce: 0, + } +} + +// Queue puts the necessary database operations for enqueueing a new item into the supplied batch +// It returns the generated key and a trigger function which must be called once the batch was successfully written +// This only returns an error if the encoding fails which is an unrecoverable error +// A lock must be held and kept until after the trigger function was called or the batch write failed +func (pq *PersistentQueue) Queue(b *state.StoreBatch, v interface{}) (key string, trigger func(), err error) { + // the nonce guarantees keys don't collide if multiple transactions are queued in the same second + pq.nonce++ + key = fmt.Sprintf("%d_%08d", time.Now().Unix(), pq.nonce) + if err = b.Put(pq.prefix+key, v); err != nil { + return "", nil, err + } + + return key, func() { + select { + case pq.trigger <- struct{}{}: + default: + } + }, nil +} + +// Peek looks at the next item in the queue +// The error returned is either a decode or an io error +// A lock must be held when this is called and should be held afterwards to prevent the item from being removed while processing +func (pq *PersistentQueue) Peek(i interface{}) (key string, exists bool, err error) { + err = pq.store.Iterate(pq.prefix, func(k, data []byte) (bool, error) { + key = string(k) + unmarshaler, ok := i.(encoding.BinaryUnmarshaler) + if !ok { + return true, json.Unmarshal(data, i) + } + return true, unmarshaler.UnmarshalBinary(data) + }) + if err != nil { + return "", false, err + } + if key == "" { + return "", false, nil + } + return strings.TrimPrefix(key, pq.prefix), true, nil +} + +// Next looks at the next item in the queue and blocks until an item is available if there is none +// The error returned is either an decode error, an io error or a cancelled context +// No lock should not be held when this is called. Only a single call to next may be active at any time +// If the the key is not "", the value exists, the supplied lock was acquired and must be released by the caller after processing the item +// The supplied lock should be the same that is used for the other functions +func (pq *PersistentQueue) Next(ctx context.Context, i interface{}, lock *sync.Mutex) (key string, err error) { + lock.Lock() + key, exists, err := pq.Peek(i) + if exists { + return key, nil + } + lock.Unlock() + if err != nil { + return "", err + } + + for { + select { + case <-pq.trigger: + lock.Lock() + key, exists, err = pq.Peek(i) + if exists { + return key, nil + } + lock.Unlock() + if err != nil { + return "", err + } + case <-ctx.Done(): + return "", ctx.Err() + } + } +} + +// Delete adds the batch operation to delete the queue element with the given key +// A lock must be held when the batch is written +func (pq *PersistentQueue) Delete(b *state.StoreBatch, key string) { + b.Delete(pq.prefix + key) +} diff --git a/swap/chain/persistentqueue_test.go b/swap/chain/persistentqueue_test.go new file mode 100644 index 0000000000..b866175a1c --- /dev/null +++ b/swap/chain/persistentqueue_test.go @@ -0,0 +1,93 @@ +package chain + +import ( + "context" + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/ethersphere/swarm/state" +) + +// TestNewPersistentQueue adds 200 elements in one routine and waits for them and then deletes them in another +func TestNewPersistentQueue(t *testing.T) { + store := state.NewInmemoryStore() + defer store.Close() + + queue := NewPersistentQueue(store, "testq") + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + var lock sync.Mutex // lock for the queue + var wg sync.WaitGroup // wait group to wait for both routines to terminate + wg.Add(2) + + count := 200 + + var errout error // stores the last error that occurred in one of the routines + go func() { + defer wg.Done() + for i := 0; i < count; i++ { + func() { // this is a function so we can use defer with the right scope + var value uint64 + key, err := queue.Next(ctx, &value, &lock) + if err != nil { + errout = fmt.Errorf("failed to get next item: %v", err) + return + } + defer lock.Unlock() + + if key == "" { + errout = errors.New("key is empty") + return + } + + if value != uint64(i) { + errout = fmt.Errorf("values don't match: got %v, expected %v", value, i) + return + } + + batch := new(state.StoreBatch) + queue.Delete(batch, key) + err = store.WriteBatch(batch) + if err != nil { + errout = fmt.Errorf("could not write batch: %v", err) + return + } + }() + } + }() + + go func() { + defer wg.Done() + for i := 0; i < count; i++ { + func() { // this is a function so we can use defer with the right scope + lock.Lock() + defer lock.Unlock() + + var value = uint64(i) + batch := new(state.StoreBatch) + _, trigger, err := queue.Queue(batch, value) + if err != nil { + errout = fmt.Errorf("failed to queue item: %v", err) + return + } + err = store.WriteBatch(batch) + if err != nil { + errout = fmt.Errorf("failed to write batch: %v", err) + return + } + + trigger() + }() + } + }() + + wg.Wait() + + if errout != nil { + t.Fatal(errout) + } +} From 65c5899dd1acbbf852caa05988462c5439bf5151 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Tue, 3 Mar 2020 19:22:46 +0100 Subject: [PATCH 02/18] swap/chain: add txqueue --- swap/chain/backend.go | 12 +- swap/chain/txqueue.go | 639 +++++++++++++++++++++++++++++++++++++ swap/chain/txqueue_test.go | 454 ++++++++++++++++++++++++++ swap/chain/txscheduler.go | 127 ++++++++ 4 files changed, 1222 insertions(+), 10 deletions(-) create mode 100644 swap/chain/txqueue.go create mode 100644 swap/chain/txqueue_test.go create mode 100644 swap/chain/txscheduler.go diff --git a/swap/chain/backend.go b/swap/chain/backend.go index 54ad6b55b1..1efc4d13f7 100644 --- a/swap/chain/backend.go +++ b/swap/chain/backend.go @@ -2,7 +2,6 @@ package chain import ( "context" - "errors" "time" "github.com/ethereum/go-ethereum/log" @@ -12,11 +11,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -var ( - // ErrTransactionReverted is given when the transaction that cashes a cheque is reverted - ErrTransactionReverted = errors.New("Transaction reverted") -) - // Backend is the minimum amount of functionality required by the underlying ethereum backend type Backend interface { bind.ContractBackend @@ -30,12 +24,10 @@ func WaitMined(ctx context.Context, b Backend, hash common.Hash) (*types.Receipt for { receipt, err := b.TransactionReceipt(ctx, hash) if err != nil { - log.Error("receipt retrieval failed", "err", err) + // some clients treat an unconfirmed transaction as an error, other simply return null + log.Trace("receipt retrieval failed", "err", err) } if receipt != nil { - if receipt.Status != types.ReceiptStatusSuccessful { - return nil, ErrTransactionReverted - } return receipt, nil } diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go new file mode 100644 index 0000000000..f8dac5585a --- /dev/null +++ b/swap/chain/txqueue.go @@ -0,0 +1,639 @@ +package chain + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "sync" + "time" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethersphere/swarm/state" +) + +// TxQueue is a TxScheduler which sends transactions in sequence +// A new transaction is only sent after the previous one confirmed +// This is done to minimize the chance of wrong nonce use +type TxQueue struct { + lock sync.Mutex // lock for the entire queue + ctx context.Context // context used for all network requests and waiting operations to ensure the queue can be stopped at any point + cancel context.CancelFunc // function to cancel the above context + wg sync.WaitGroup // used to ensure that all background go routines have finished before Stop returns + started bool // bool indicating that the queue has been started. used to ensure it does not run multiple times simultaneously + errorChan chan error // channel to stop the queue in case of errors + + store state.Store // state store to use as the db backend + prefix string // all keys in the state store are prefixed with this + requestQueue *PersistentQueue // queue for all future requests + handlers map[string]*TxRequestHandlers // map from handlerIDs to their registered handlers + notificationQueues map[string]*PersistentQueue // map from handlerIDs to the notification queue of that handler + + backend TxSchedulerBackend // ethereum backend to use + privateKey *ecdsa.PrivateKey // private key used to sign transactions +} + +// txRequestData is the metadata the queue saves for every request +// the extra data is stored at a different key +type txRequestData struct { + ID uint64 // id of the request + Request TxRequest // the request itself + HandlerID string // the type id of this request + State TxRequestState // the state this request is in + Transaction *types.Transaction // the generated transaction for this request or nil if not yet signed +} + +// notificationQueueItem is the metadata the queue saves for every pending notification +// the actual notification content is stored at a different key +type notificationQueueItem struct { + NotificationType string // the type of the notification + RequestID uint64 // the request this notification is for +} + +const ( + txReceiptNotificationType = "TxReceiptNotification" + txPendingNotificationType = "TxPendingNotification" + txCancelledNotificationType = "TxCancelledNotification" + txStatusUnknownNotificationType = "TxStatusUnknownNotification" +) + +// NewTxQueue creates a new TxQueue +func NewTxQueue(store state.Store, prefix string, backend TxSchedulerBackend, privateKey *ecdsa.PrivateKey) *TxQueue { + txq := &TxQueue{ + store: store, + prefix: prefix, + handlers: make(map[string]*TxRequestHandlers), + notificationQueues: make(map[string]*PersistentQueue), + backend: backend, + privateKey: privateKey, + requestQueue: NewPersistentQueue(store, prefix+"_requestQueue_"), + errorChan: make(chan error, 1), + } + // we create the context here already because handlers can be set before the queue starts + txq.ctx, txq.cancel = context.WithCancel(context.Background()) + return txq +} + +// requestKey returns the database key for the txRequestData for the given id +func (txq *TxQueue) requestKey(id uint64) string { + return fmt.Sprintf("%s_requests_%d", txq.prefix, id) +} + +// extraDataKey returns the database key for the extra data stored alongside the request +func (txq *TxQueue) extraDataKey(id uint64) string { + return fmt.Sprintf("%s_data", txq.requestKey(id)) +} + +// activeRequestKey returns the database key used for the currently active request +func (txq *TxQueue) activeRequestKey() string { + return fmt.Sprintf("%s_active", txq.prefix) +} + +// notificationKey returns the database key for a notification +func (txq *TxQueue) notificationKey(key string) string { + return fmt.Sprintf("%s_notification_%s", txq.prefix, key) +} + +// idKey returns the database key for the last used id value +func (txq *TxQueue) idKey() string { + return fmt.Sprintf("%s_request_id", txq.prefix) +} + +// stopWithError sends the error to the error channel +// this is used to stop the queue from notification handlers +func (txq *TxQueue) stopWithError(err error) { + select { + case txq.errorChan <- err: + default: + log.Error("failed to write error to txqueue error channel", "error", err) + } +} + +// ScheduleRequest adds a new request to be processed +// The request is assigned an id which is returned +func (txq *TxQueue) ScheduleRequest(handlerID string, request TxRequest, extraData interface{}) (id uint64, err error) { + txq.lock.Lock() + defer txq.lock.Unlock() + + // get the last id + err = txq.store.Get(txq.idKey(), &id) + if err != nil && err != state.ErrNotFound { + return 0, err + } + // ids start at 1 + id++ + + // in a single batch this + // * stores the request data + // * stores the request extraData + // * adds it to the queue + batch := new(state.StoreBatch) + err = batch.Put(txq.idKey(), id) + if err != nil { + return 0, err + } + + err = batch.Put(txq.extraDataKey(id), extraData) + if err != nil { + return 0, err + } + + err = batch.Put(txq.requestKey(id), &txRequestData{ + ID: id, + Request: request, + HandlerID: handlerID, + State: TxRequestStateScheduled, + }) + if err != nil { + return 0, err + } + + _, triggerQueue, err := txq.requestQueue.Queue(batch, id) + if err != nil { + return 0, err + } + + // persist to disk + err = txq.store.WriteBatch(batch) + if err != nil { + return 0, err + } + + triggerQueue() + return id, nil +} + +// GetExtraData load the serialized extra data for this request from disk and tries to decode it +func (txq *TxQueue) GetExtraData(id uint64, request interface{}) error { + return txq.store.Get(txq.extraDataKey(id), &request) +} + +// GetRequestState gets the state the request is currently in +func (txq *TxQueue) GetRequestState(id uint64) (TxRequestState, error) { + var requestMetadata *txRequestData + err := txq.store.Get(txq.requestKey(id), &requestMetadata) + if err != nil { + return 0, err + } + return requestMetadata.State, nil +} + +// Start starts processing transactions if it is not already doing so +func (txq *TxQueue) Start() { + txq.lock.Lock() + defer txq.lock.Unlock() + + if txq.started { + return + } + + txq.started = true + txq.wg.Add(2) + go func() { + defer txq.wg.Done() + // run the actual loop + err := txq.loop() + if err != nil && !errors.Is(err, context.Canceled) { + log.Error("transaction queue terminated with an error", "queue", txq.prefix, "error", err) + } + }() + + go func() { + defer txq.wg.Done() + // listen on the error channel and stop the queue on error + select { + case err := <-txq.errorChan: + log.Error("unrecoverable transaction queue error (transaction processing disabled)", "error", err) + txq.Stop() + case <-txq.ctx.Done(): + } + }() +} + +// Stop stops processing transactions if it is running +// It will block until processing has terminated +func (txq *TxQueue) Stop() { + txq.lock.Lock() + + if !txq.started { + txq.lock.Unlock() + return + } + + // we cancel the context that all long running operations in the queue use + txq.cancel() + txq.lock.Unlock() + // wait until all routines have finished + txq.wg.Wait() +} + +// getNotificationQueue gets the notification queue for a handler +// it initializes the struct if it does not yet exist +// the TxQueue lock must be held +func (txq *TxQueue) getNotificationQueue(handlerID string) *PersistentQueue { + queue, ok := txq.notificationQueues[handlerID] + if !ok { + queue = NewPersistentQueue(txq.store, fmt.Sprintf("%s_notify_%s", txq.prefix, handlerID)) + txq.notificationQueues[handlerID] = queue + } + return queue +} + +// SetHandlers registers the handlers for the given handlerID +// This starts the delivery of notifications for this handlerID +func (txq *TxQueue) SetHandlers(handlerID string, handlers *TxRequestHandlers) error { + txq.lock.Lock() + defer txq.lock.Unlock() + + if txq.handlers[handlerID] != nil { + return fmt.Errorf("handlers for %s already set", handlerID) + } + txq.handlers[handlerID] = handlers + notifyQueue := txq.getNotificationQueue(handlerID) + + // go routine processing the notification queue for this handler + txq.wg.Add(1) + go func() { + defer txq.wg.Done() + + for { + var item notificationQueueItem + // get the next notification item + key, err := notifyQueue.Next(txq.ctx, &item, &txq.lock) + if err != nil { + if !errors.Is(err, context.Canceled) { + txq.stopWithError(fmt.Errorf("could not read from notification queue: %v", err)) + } + return + } + // since this is the only function which deletes this item from notifyQueue we can already unlock here + txq.lock.Unlock() + + // load and decode the notification + var notification interface{} + switch item.NotificationType { + case txReceiptNotificationType: + notification = &TxReceiptNotification{} + case txPendingNotificationType: + notification = &TxPendingNotification{} + case txCancelledNotificationType: + notification = &TxCancelledNotification{} + case txStatusUnknownNotificationType: + notification = &TxStatusUnknownNotification{} + } + + err = txq.store.Get(txq.notificationKey(key), notification) + if err != nil { + txq.stopWithError(fmt.Errorf("could not read notification: %v", err)) + return + } + + switch item.NotificationType { + case txReceiptNotificationType: + if handlers.NotifyReceipt != nil { + err = handlers.NotifyReceipt(txq.ctx, item.RequestID, notification.(*TxReceiptNotification)) + } + case txPendingNotificationType: + if handlers.NotifyPending != nil { + err = handlers.NotifyPending(txq.ctx, item.RequestID, notification.(*TxPendingNotification)) + } + case txCancelledNotificationType: + if handlers.NotifyCancelled != nil { + err = handlers.NotifyCancelled(txq.ctx, item.RequestID, notification.(*TxCancelledNotification)) + } + case txStatusUnknownNotificationType: + if handlers.NotifyStatusUnknown != nil { + err = handlers.NotifyStatusUnknown(txq.ctx, item.RequestID, notification.(*TxStatusUnknownNotification)) + } + } + + // if a handler failed we will try again in 10 seconds + if err != nil { + log.Error("transaction request handler failed", "type", item.NotificationType, "request", item.RequestID, "error", err) + select { + case <-txq.ctx.Done(): + return + case <-time.After(10 * time.Second): + continue + } + } + + // once the notification was handled delete it from the queue + txq.lock.Lock() + batch := new(state.StoreBatch) + notifyQueue.Delete(batch, key) + err = txq.store.WriteBatch(batch) + txq.lock.Unlock() + if err != nil { + txq.stopWithError(fmt.Errorf("could not delete notification: %v", err)) + return + } + } + }() + return nil +} + +// helper function to trigger a notification +// the returned trigger function must be called once the batch has been written +// must be called with the txqueue lock held +func (txq *TxQueue) notify(batch *state.StoreBatch, id uint64, handlerID string, notificationType string, notification interface{}) (triggerNotifyQueue func(), err error) { + notifyQueue := txq.getNotificationQueue(handlerID) + key, triggerNotifyQueue, err := notifyQueue.Queue(batch, ¬ificationQueueItem{ + RequestID: id, + NotificationType: notificationType, + }) + if err != nil { + return nil, fmt.Errorf("could not serialize notification queue item: %v", err) + } + + err = batch.Put(txq.notificationKey(key), notification) + if err != nil { + return nil, fmt.Errorf("could not serialize notification: %v", err) + } + return triggerNotifyQueue, nil +} + +// waitForNextRequestLegacy waits for the next request and sets it as the active request +// the txqueue lock must not be held +func (txq *TxQueue) waitForNextRequest() (requestMetadata *txRequestData, err error) { + var id uint64 + // get the id of the next request in the queue + key, err := txq.requestQueue.Next(txq.ctx, &id, &txq.lock) + if err != nil { + return nil, err + } + defer txq.lock.Unlock() + + err = txq.store.Get(txq.requestKey(id), &requestMetadata) + if err != nil { + return nil, err + } + + // if the request was successfully decoded it is removed from the queue and set as the active request + batch := new(state.StoreBatch) + err = batch.Put(txq.activeRequestKey(), requestMetadata.ID) + if err != nil { + return nil, fmt.Errorf("could not put id write into batch: %v", err) + } + txq.requestQueue.Delete(batch, key) + + err = txq.store.WriteBatch(batch) + if err != nil { + return nil, err + } + + return requestMetadata, nil +} + +// helper function to set a request state and remove it as the active request in a single batch +// the txqueue lock must be held +func (txq *TxQueue) finalizeRequest(batch *state.StoreBatch, requestMetadata *txRequestData, state TxRequestState) error { + requestMetadata.State = state + err := batch.Put(txq.requestKey(requestMetadata.ID), requestMetadata.ID) + if err != nil { + return err + } + batch.Delete(txq.activeRequestKey()) + return txq.store.WriteBatch(batch) +} + +// helper function to set a request as cancelled and emit the appropriate notification +// the txqueue lock must be held +func (txq *TxQueue) finalizeRequestCancelled(requestMetadata *txRequestData, err error) error { + batch := new(state.StoreBatch) + trigger, err := txq.notify(batch, requestMetadata.ID, requestMetadata.HandlerID, "TxCancelledNotification", &TxCancelledNotification{ + Reason: err.Error(), + }) + if err != nil { + return err + } + + err = txq.finalizeRequest(batch, requestMetadata, TxRequestStateCancelled) + if err != nil { + return err + } + trigger() + return nil +} + +// helper function to set a request as status unknown and emit the appropriate notification +// the txqueue lock must be held +func (txq *TxQueue) finalizeRequestStatusUnknown(requestMetadata *txRequestData, reason string) error { + batch := new(state.StoreBatch) + trigger, err := txq.notify(batch, requestMetadata.ID, requestMetadata.HandlerID, txStatusUnknownNotificationType, &TxStatusUnknownNotification{ + Reason: reason, + }) + if err != nil { + return err + } + + err = txq.finalizeRequest(batch, requestMetadata, TxRequestStateStatusUnknown) + if err != nil { + return err + } + trigger() + return nil +} + +// helper function to set a request as confirmed and emit the appropriate notification +// the txqueue lock must be held +func (txq *TxQueue) finalizeRequestConfirmed(requestMetadata *txRequestData, receipt types.Receipt) error { + batch := new(state.StoreBatch) + trigger, err := txq.notify(batch, requestMetadata.ID, requestMetadata.HandlerID, txReceiptNotificationType, &TxReceiptNotification{ + Receipt: receipt, + }) + if err != nil { + return err + } + + err = txq.finalizeRequest(batch, requestMetadata, TxRequestStateConfirmed) + if err != nil { + return err + } + trigger() + return nil +} + +// processRequest continues processing the provided request +func (txq *TxQueue) processRequest(requestMetadata *txRequestData) error { + switch requestMetadata.State { + case TxRequestStateScheduled: + err := txq.generateTransaction(requestMetadata) + if err != nil { + return err + } + fallthrough + case TxRequestStateSigned: + err := txq.sendTransaction(requestMetadata) + if err != nil { + return err + } + fallthrough + case TxRequestStatePending: + return txq.waitForActiveTransaction(requestMetadata) + default: + return fmt.Errorf("trying to process transaction in unknown state: %d", requestMetadata.State) + } +} + +// generateTransaction assigns the nonce, signs the resulting transaction and saves it +func (txq *TxQueue) generateTransaction(requestMetadata *txRequestData) error { + opts := bind.NewKeyedTransactor(txq.privateKey) + opts.Context = txq.ctx + + nonce, err := txq.backend.PendingNonceAt(txq.ctx, opts.From) + if err != nil { + return txq.finalizeRequestCancelled(requestMetadata, err) + } + + request := requestMetadata.Request + if request.GasLimit == 0 { + request.GasLimit, err = txq.backend.EstimateGas(txq.ctx, ethereum.CallMsg{ + From: opts.From, + To: &request.To, + Data: request.Data, + }) + if err != nil { + return txq.finalizeRequestCancelled(requestMetadata, err) + } + } + + if request.GasPrice == nil { + request.GasPrice, err = txq.backend.SuggestGasPrice(txq.ctx) + if err != nil { + return txq.finalizeRequestCancelled(requestMetadata, err) + } + } + + tx := types.NewTransaction( + nonce, + request.To, + request.Value, + request.GasLimit, + request.GasPrice, + request.Data, + ) + + requestMetadata.Transaction, err = opts.Signer(&types.HomesteadSigner{}, opts.From, tx) + if err != nil { + return txq.finalizeRequestCancelled(requestMetadata, err) + } + requestMetadata.State = TxRequestStateSigned + return txq.store.Put(txq.requestKey(requestMetadata.ID), requestMetadata) +} + +// sendTransaction sends the signed transaction to the ethereum backend +func (txq *TxQueue) sendTransaction(requestMetadata *txRequestData) error { + err := txq.backend.SendTransactionWithID(txq.ctx, requestMetadata.ID, requestMetadata.Transaction) + txq.lock.Lock() + defer txq.lock.Unlock() + if err != nil { + // even if SendTransactionRequest returns an error there are still certain rare edge cases where the transaction might still be sent so we mark it as status unknown + return txq.finalizeRequestStatusUnknown(requestMetadata, err.Error()) + } + // if we have a hash we mark the transaction as pending + batch := new(state.StoreBatch) + requestMetadata.State = TxRequestStatePending + err = batch.Put(txq.requestKey(requestMetadata.ID), requestMetadata) + if err != nil { + return err + } + trigger, err := txq.notify(batch, requestMetadata.ID, requestMetadata.HandlerID, txPendingNotificationType, &TxPendingNotification{ + Transaction: requestMetadata.Transaction, + }) + if err != nil { + return err + } + err = txq.store.WriteBatch(batch) + if err != nil { + return err + } + trigger() + return nil +} + +// processActiveRequest continues monitoring the active request if there is one +// this is called on startup before the queue begins normal operation +func (txq *TxQueue) processActiveRequest() error { + // get the stored active request key + // if nothing is stored id will remain 0 (which is invalid as ids start with 1) + var id uint64 + err := txq.store.Get(txq.activeRequestKey(), &id) + if err == state.ErrNotFound { + return nil + } + if err != nil { + return err + } + + // load the request metadata + var requestMetadata txRequestData + err = txq.store.Get(txq.requestKey(id), &requestMetadata) + if err != nil { + return err + } + + // continue processing as regular + return txq.processRequest(&requestMetadata) + +} + +// waitForActiveTransaction waits for requestMetadata to be mined and resets the active transaction afterwards +// the transaction will also be considered mined once the notification was queued successfully +// this only returns an error if the encoding fails which is an unrecoverable error +// the txqueue lock must not be held +func (txq *TxQueue) waitForActiveTransaction(requestMetadata *txRequestData) error { + ctx, cancel := context.WithTimeout(txq.ctx, 20*time.Minute) + defer cancel() + + // an error here means the context was cancelled + receipt, err := WaitMined(ctx, txq.backend, requestMetadata.Transaction.Hash()) + txq.lock.Lock() + defer txq.lock.Unlock() + if err != nil { + // if the main context of the TxQueue was cancelled we log and return + if txq.ctx.Err() != nil { + log.Info("terminating transaction queue while waiting for a transaction", "hash", requestMetadata.Transaction.Hash()) + return nil + } + + // if the timeout context expired we mark the transaction status as unknown + // future versions of the queue (with local nonce-tracking) should keep note of that and reuse the nonce for the next request + log.Warn("transaction timeout reached", "hash", requestMetadata.Transaction.Hash()) + return txq.finalizeRequestStatusUnknown(requestMetadata, "transaction timed out") + } + + return txq.finalizeRequestConfirmed(requestMetadata, *receipt) +} + +// loop is the main transaction processing function of the TxQueue +// first it checks if there already is an active request. If so it processes this first +// then it will take requests from the queue in a loop and execute those +func (txq *TxQueue) loop() error { + err := txq.processActiveRequest() + if err != nil { + return err + } + + for { + select { + case <-txq.ctx.Done(): + return nil + default: + } + + requestMetadata, err := txq.waitForNextRequest() + if err != nil { + return err + } + + err = txq.processRequest(requestMetadata) + if err != nil { + return err + } + } +} diff --git a/swap/chain/txqueue_test.go b/swap/chain/txqueue_test.go new file mode 100644 index 0000000000..97d9971ea5 --- /dev/null +++ b/swap/chain/txqueue_test.go @@ -0,0 +1,454 @@ +package chain + +import ( + "bytes" + "context" + "errors" + "fmt" + "math/big" + "reflect" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind/backends" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethersphere/swarm/state" +) + +var ( + senderKey, _ = crypto.HexToECDSA("634fb5a872396d9693e5c9f9d7233cfa93f395c093371017ff44aa9ae6564cdd") + senderAddress = crypto.PubkeyToAddress(senderKey.PublicKey) +) + +var defaultBackend = backends.NewSimulatedBackend(core.GenesisAlloc{ + senderAddress: {Balance: big.NewInt(1000000000000000000)}, +}, 8000000) + +// backend.SendTransaction outcome associated with a request id +type testRequestOutcome struct { + noCommit bool // the backend should not automatically mine the transaction + sendError error // SendTransaction should return with this error +} + +// testTxSchedulerBackend wraps a SimulatedBackend and provides a way to determine the result of SendTransaction +type testTxSchedulerBackend struct { + *backends.SimulatedBackend + requestOutcomes map[uint64]testRequestOutcome // map of request id to outcome + lock sync.Mutex // lock for map access and blocking SendTransactionWithID +} + +func newTestTxSchedulerBackend(backend *backends.SimulatedBackend) *testTxSchedulerBackend { + return &testTxSchedulerBackend{ + SimulatedBackend: backend, + requestOutcomes: make(map[uint64]testRequestOutcome), + } +} + +func (b *testTxSchedulerBackend) SendTransactionWithID(ctx context.Context, id uint64, tx *types.Transaction) error { + b.lock.Lock() + defer b.lock.Unlock() + outcome, ok := b.requestOutcomes[id] + if ok { + if outcome.sendError != nil { + return outcome.sendError + } + err := b.SimulatedBackend.SendTransaction(ctx, tx) + if err == nil && !outcome.noCommit { + b.SimulatedBackend.Commit() + } + return err + } + err := b.SimulatedBackend.SendTransaction(ctx, tx) + if err == nil { + b.SimulatedBackend.Commit() + } + return err +} + +const testHandlerID = "test_TestRequest" + +// txSchedulerTester is a helper used for testing TxScheduler implementations +// it saves received notifications to channels so they can easily be checked in tests +// furthermore it can trigger certain errors depending on flags set in the requests +type txSchedulerTester struct { + lock sync.Mutex + txScheduler TxScheduler + chans map[uint64]*txSchedulerTesterRequestData // map from request id to channels + backend *testTxSchedulerBackend +} + +// txSchedulerTesterRequestData is the data txSchedulerTester saves for every request +type txSchedulerTesterRequestData struct { + ReceiptNotification chan *TxReceiptNotification + CancelledNotification chan *TxCancelledNotification + PendingNotification chan *TxPendingNotification + StatusUnknownNotification chan *TxStatusUnknownNotification + request TxRequest +} + +type txSchedulerTesterRequestExtraData struct { +} + +func newTxSchedulerTester(backend *testTxSchedulerBackend, txScheduler TxScheduler) (*txSchedulerTester, error) { + tc := &txSchedulerTester{ + txScheduler: txScheduler, + backend: backend, + chans: make(map[uint64]*txSchedulerTesterRequestData), + } + err := tc.setHandlers(txScheduler) + if err != nil { + return nil, err + } + return tc, nil +} + +// hooks up the TxScheduler handlers to the txSchedulerTester channels +func (tc *txSchedulerTester) setHandlers(txScheduler TxScheduler) error { + return txScheduler.SetHandlers(testHandlerID, &TxRequestHandlers{ + NotifyReceipt: func(ctx context.Context, id uint64, notification *TxReceiptNotification) error { + select { + case tc.getRequest(id).ReceiptNotification <- notification: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }, + NotifyCancelled: func(ctx context.Context, id uint64, notification *TxCancelledNotification) error { + select { + case tc.getRequest(id).CancelledNotification <- notification: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }, + NotifyPending: func(ctx context.Context, id uint64, notification *TxPendingNotification) error { + select { + case tc.getRequest(id).PendingNotification <- notification: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }, + NotifyStatusUnknown: func(ctx context.Context, id uint64, notification *TxStatusUnknownNotification) error { + select { + case tc.getRequest(id).StatusUnknownNotification <- notification: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }, + }) +} + +// schedule request with the provided extra data and the transaction outcome +func (tc *txSchedulerTester) schedule(request TxRequest, requestExtraData interface{}, outcome *testRequestOutcome) (uint64, error) { + // this lock here is crucial as it blocks SendTransaction until the requestOutcomes has been set + tc.backend.lock.Lock() + defer tc.backend.lock.Unlock() + id, err := tc.txScheduler.ScheduleRequest(testHandlerID, request, requestExtraData) + if err != nil { + return 0, err + } + if outcome != nil { + tc.backend.requestOutcomes[id] = *outcome + } + + tc.getRequest(id).request = request + return id, nil +} + +// getRequest gets the txSchedulerTesterRequestData for this id or initializes it if it does not yet exist +func (tc *txSchedulerTester) getRequest(id uint64) *txSchedulerTesterRequestData { + tc.lock.Lock() + defer tc.lock.Unlock() + c, ok := tc.chans[id] + if !ok { + tc.chans[id] = &txSchedulerTesterRequestData{ + ReceiptNotification: make(chan *TxReceiptNotification), + PendingNotification: make(chan *TxPendingNotification), + CancelledNotification: make(chan *TxCancelledNotification), + StatusUnknownNotification: make(chan *TxStatusUnknownNotification), + } + return tc.chans[id] + } + return c +} + +// expectStateChangedNotification waits for a StateChangedNotification with the given parameters +func (tc *txSchedulerTester) expectStatusUnknownNotification(ctx context.Context, id uint64, reason string) error { + var notification *TxStatusUnknownNotification + request := tc.getRequest(id) + select { + case notification = <-request.StatusUnknownNotification: + case <-ctx.Done(): + return ctx.Err() + } + + if notification.Reason != reason { + return fmt.Errorf("reason mismatch. got %s, expected %s", notification.Reason, reason) + } + + return nil +} + +func (tc *txSchedulerTester) expectPendingNotification(ctx context.Context, id uint64) error { + var notification *TxPendingNotification + request := tc.getRequest(id) + select { + case notification = <-request.PendingNotification: + case <-ctx.Done(): + return ctx.Err() + } + + tx := notification.Transaction + if !bytes.Equal(tx.Data(), request.request.Data) { + return fmt.Errorf("transaction data mismatch. got %v, expected %v", tx.Data(), request.request.Data) + } + + if *tx.To() != request.request.To { + return fmt.Errorf("transaction to mismatch. got %v, expected %v", tx.To(), request.request.To) + } + + if tx.Value().Cmp(request.request.Value) != 0 { + return fmt.Errorf("transaction value mismatch. got %v, expected %v", tx.Value(), request.request.Value) + } + + return nil +} + +// expectStateChangedNotification waits for a ReceiptNotification for the given request id and verifies its hash +func (tc *txSchedulerTester) expectReceiptNotification(ctx context.Context, id uint64) error { + var notification *TxReceiptNotification + request := tc.getRequest(id) + select { + case notification = <-request.ReceiptNotification: + case <-ctx.Done(): + return ctx.Err() + } + + tx, pending, err := tc.backend.TransactionByHash(ctx, notification.Receipt.TxHash) + if err != nil { + return err + } + if pending { + return errors.New("received a receipt notification for a pending transaction") + } + + if tx == nil { + return errors.New("transaction not found") + } + + if !bytes.Equal(tx.Data(), request.request.Data) { + return fmt.Errorf("transaction data mismatch. got %v, expected %v", tx.Data(), request.request.Data) + } + + if *tx.To() != request.request.To { + return fmt.Errorf("transaction to mismatch. got %v, expected %v", tx.To(), request.request.To) + } + + if tx.Value().Cmp(request.request.Value) != 0 { + return fmt.Errorf("transaction value mismatch. got %v, expected %v", tx.Value(), request.request.Value) + } + + return nil +} + +// makeTestRequest creates a simple test request to the 0x0 address +func makeTestRequest() TxRequest { + return TxRequest{ + To: common.Address{}, + Value: big.NewInt(0), + Data: []byte{}, + } +} + +// helper function for queue tests which sets up everything and provides a cleanup function +// if run is true the queue starts processing requests and cleanup function will wait for proper termination +func setupTxQueueTest(run bool) (*TxQueue, *testTxSchedulerBackend, func()) { + backend := defaultBackend + backend.Commit() + + testBackend := newTestTxSchedulerBackend(backend) + + store := state.NewInmemoryStore() + txq := NewTxQueue(store, "test", testBackend, senderKey) + if run { + txq.Start() + } + return txq, testBackend, func() { + if run { + txq.Stop() + } + store.Close() + } +} + +// TestTxQueueScheduleRequest tests scheduling a single request when the queue is not running +// Afterwards the queue is started and the correct sequence of notifications is expected +func TestTxQueueScheduleRequest(t *testing.T) { + txq, backend, clean := setupTxQueueTest(false) + defer clean() + tc, err := newTxSchedulerTester(backend, txq) + if err != nil { + t.Fatal(err) + } + + testRequest := &txSchedulerTesterRequestExtraData{} + + id, err := tc.schedule(makeTestRequest(), testRequest, nil) + if err != nil { + t.Fatal(err) + } + + if id != 1 { + t.Fatal("expected id to be 1") + } + + var testRequestRetrieved *txSchedulerTesterRequestExtraData + err = txq.GetExtraData(id, &testRequestRetrieved) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(testRequest, testRequestRetrieved) { + t.Fatalf("got request %v, expected %v", testRequestRetrieved, testRequest) + } + + txq.Start() + defer txq.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + if err = tc.expectPendingNotification(ctx, id); err != nil { + t.Fatal(err) + } + + if err = tc.expectReceiptNotification(ctx, id); err != nil { + t.Fatal(err) + } +} + +// TestTxQueueManyRequests schedules many requests and expects all of them to be successful +func TestTxQueueManyRequests(t *testing.T) { + txq, backend, clean := setupTxQueueTest(true) + defer clean() + tc, err := newTxSchedulerTester(backend, txq) + if err != nil { + t.Fatal(err) + } + + var ids []uint64 + count := 200 + for i := 0; i < count; i++ { + id, err := tc.schedule(makeTestRequest(), &txSchedulerTesterRequestExtraData{}, nil) + if err != nil { + t.Fatal(err) + } + + ids = append(ids, id) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + for _, id := range ids { + err = tc.expectPendingNotification(ctx, id) + if err != nil { + t.Fatal(err) + } + err = tc.expectReceiptNotification(ctx, id) + if err != nil { + t.Fatal(err) + } + } +} + +// TestTxQueueActiveTransaction tests that the queue continues to monitor the last pending transaction +func TestTxQueueActiveTransaction(t *testing.T) { + txq, backend, clean := setupTxQueueTest(false) + defer clean() + + tc, err := newTxSchedulerTester(backend, txq) + if err != nil { + t.Fatal(err) + } + + txq.Start() + + id, err := tc.schedule(makeTestRequest(), 5, &testRequestOutcome{ + noCommit: true, + }) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err = tc.expectPendingNotification(ctx, id) + if err != nil { + t.Fatal(err) + } + + txq.Stop() + + state, err := txq.GetRequestState(id) + if err != nil { + t.Fatal(err) + } + if state != TxRequestStatePending { + t.Fatalf("state not pending, was %d", state) + } + + // start a new queue with the same store and backend + txq2 := NewTxQueue(txq.store, txq.prefix, txq.backend, txq.privateKey) + if err != nil { + t.Fatal(err) + } + // reuse the tester so it maintains state about the tx hash and id + tc.setHandlers(txq2) + + if err != nil { + t.Fatal(err) + } + + // the transaction confirmed in the meantime + backend.Commit() + + txq2.Start() + defer txq2.Stop() + + err = tc.expectReceiptNotification(ctx, id) + if err != nil { + t.Fatal(err) + } +} + +// TestTxQueueErrorDuringSend tests that a request is marked as TxRequestStateStatusUnknown if the send fails +func TestTxQueueErrorDuringSend(t *testing.T) { + txq, backend, clean := setupTxQueueTest(true) + defer clean() + tc, err := newTxSchedulerTester(backend, txq) + if err != nil { + t.Fatal(err) + } + + id, err := tc.schedule(makeTestRequest(), 5, &testRequestOutcome{ + sendError: errors.New("test error"), + }) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err = tc.expectStatusUnknownNotification(ctx, id, "test error") + if err != nil { + t.Fatal(err) + } +} diff --git a/swap/chain/txscheduler.go b/swap/chain/txscheduler.go new file mode 100644 index 0000000000..4030b705a6 --- /dev/null +++ b/swap/chain/txscheduler.go @@ -0,0 +1,127 @@ +package chain + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// TxSchedulerBackend is an extension of the normal Backend interface +type TxSchedulerBackend interface { + Backend + // SendTransactionWithID is the same as SendTransaction but with the ID of the associated request passed alongside + // This is primarily used so the backend can react with the expected result during testing + SendTransactionWithID(ctx context.Context, id uint64, tx *types.Transaction) error +} + +// DefaultTxSchedulerBackend is the standard backend that should be used +// It simply wraps another Backend +type DefaultTxSchedulerBackend struct { + Backend +} + +// SendTransactionWithID in the default Backend calls the underlying SendTransaction function +func (b *DefaultTxSchedulerBackend) SendTransactionWithID(ctx context.Context, id uint64, tx *types.Transaction) error { + return b.Backend.SendTransaction(ctx, tx) +} + +// TxRequest describes a request for a transaction that can be scheduled +type TxRequest struct { + To common.Address // recipient of the transaction + Data []byte // transaction data + GasPrice *big.Int // gas price or nil if suggested gas price should be used + GasLimit uint64 // gas limit or 0 if it should be estimated + Value *big.Int // amount of wei to send +} + +// ToSignedTx returns a signed types.Transaction for the given request and nonce +func (request *TxRequest) ToSignedTx(nonce uint64, opts *bind.TransactOpts) (*types.Transaction, error) { + tx := types.NewTransaction( + nonce, + request.To, + request.Value, + request.GasLimit, + request.GasPrice, + request.Data, + ) + + return opts.Signer(&types.HomesteadSigner{}, opts.From, tx) +} + +// TxScheduler represents a central sender for all transactions from a single ethereum account +// its purpose is to ensure there are no nonce issues and that transaction initiators are notified of the result +// notifications are guaranteed to happen even across node restarts and disconnects from the ethereum backend +type TxScheduler interface { + // SetHandlers registers the handlers for the given handlerID + // This starts the delivery of notifications for this handlerID + SetHandlers(handlerID string, handlers *TxRequestHandlers) error + // ScheduleRequest adds a new request to be processed + // The request is assigned an id which is returned + ScheduleRequest(handlerID string, request TxRequest, requestExtraData interface{}) (id uint64, err error) + // GetExtraData load the serialized extra data for this request from disk and tries to decode it + GetExtraData(id uint64, request interface{}) error + // GetRequestState gets the state the request is currently in + GetRequestState(id uint64) (TxRequestState, error) + // Start starts processing transactions if it is not already doing so + // This cannot be used to restart the queue once stopped + Start() + // Stop stops processing transactions if it is running + // It will block until processing has terminated + Stop() +} + +// TxRequestHandlers holds all the callbacks for a given string +// Any of the functions may be nil +// Notify functions are called by the transaction queue when a notification for a transaction occurs +// If the handler returns an error the notification will be resent in the future (including across restarts) +type TxRequestHandlers struct { + // NotifyReceipt is called the first time a receipt is observed for a transaction + NotifyReceipt func(ctx context.Context, id uint64, notification *TxReceiptNotification) error + // NotifyPending is called after the transaction was successfully sent to the backend + NotifyPending func(ctx context.Context, id uint64, notification *TxPendingNotification) error + // NotifyCancelled is called when it is certain that this transaction will never be sent + NotifyCancelled func(ctx context.Context, id uint64, notification *TxCancelledNotification) error + // NotifyStatusUnknown is called if it cannot be determined if the transaction might be confirmed + NotifyStatusUnknown func(ctx context.Context, id uint64, notification *TxStatusUnknownNotification) error +} + +// TxReceiptNotification is the notification emitted when the receipt is available +type TxReceiptNotification struct { + Receipt types.Receipt // the receipt of the included transaction +} + +// TxCancelledNotification is the notification emitted when it is certain that a transaction will never be sent +type TxCancelledNotification struct { + Reason string // The reason behind the cancellation +} + +// TxStatusUnknownNotification is the notification emitted if it cannot be determined if the transaction might be confirmed +type TxStatusUnknownNotification struct { + Reason string // The reason why it is unknown +} + +// TxPendingNotification is the notification emitted after the transaction was successfully sent to the backend +type TxPendingNotification struct { + Transaction *types.Transaction // The transaction that was sent +} + +// TxRequestState is the type used to indicate which state the transaction is in +type TxRequestState uint8 + +const ( + // TxRequestStateScheduled is the initial state for all requests that enter the queue + TxRequestStateScheduled TxRequestState = 0 + // TxRequestStateSigned means the transaction has been generated and signed but not yet sent + TxRequestStateSigned TxRequestState = 1 + // TxRequestStatePending means the transaction has been sent but is not yet confirmed + TxRequestStatePending TxRequestState = 2 + // TxRequestStateConfirmed is entered the first time a confirmation is received + TxRequestStateConfirmed TxRequestState = 3 + // TxRequestStateStatusUnknown is used for all cases where it is unclear wether the transaction was broadcast or not. This is also used for timed-out transactions. + TxRequestStateStatusUnknown TxRequestState = 4 + // TxRequestStateCancelled is used for all cases where it is certain the transaction was and never will be sent + TxRequestStateCancelled TxRequestState = 5 +) From 145097c7b9ff943392dcb2d9827807aea581b942 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Tue, 3 Mar 2020 19:22:52 +0100 Subject: [PATCH 03/18] swap, swap/chain, contract/swap: use txqueue for cashout --- contracts/swap/swap.go | 34 +++++++--- swap/cashout.go | 137 +++++++++++++++++++++++---------------- swap/cashout_test.go | 93 ++++++++++++++++++++------ swap/common_test.go | 83 ++++++++++++++---------- swap/protocol_test.go | 11 +--- swap/simulations_test.go | 23 ++++--- swap/swap.go | 63 +++++++++--------- swap/swap_test.go | 31 ++------- 8 files changed, 277 insertions(+), 198 deletions(-) diff --git a/contracts/swap/swap.go b/contracts/swap/swap.go index fec90fd99a..87eb9ef874 100644 --- a/contracts/swap/swap.go +++ b/contracts/swap/swap.go @@ -22,7 +22,9 @@ package swap import ( "fmt" "math/big" + "strings" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -37,8 +39,8 @@ type Contract interface { Withdraw(auth *bind.TransactOpts, amount *big.Int) (*types.Receipt, error) // Deposit sends a raw transaction to the chequebook, triggering the fallback—depositing amount Deposit(auth *bind.TransactOpts, amout *big.Int) (*types.Receipt, error) - // CashChequeBeneficiaryStart sends the transaction to cash a cheque as the beneficiary - CashChequeBeneficiaryStart(opts *bind.TransactOpts, beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*types.Transaction, error) + // CashChequeBeneficiaryRequest generates a TxRequest for a CashChequeBeneficiary transaction + CashChequeBeneficiaryRequest(beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*chain.TxRequest, error) // CashChequeBeneficiaryResult processes the receipt from a CashChequeBeneficiary transaction CashChequeBeneficiaryResult(receipt *types.Receipt) *CashChequeResult // LiquidBalance returns the LiquidBalance (total balance in ERC20-token - total hard deposits in ERC20-token) of the chequebook @@ -75,6 +77,7 @@ type Params struct { type simpleContract struct { instance *contract.ERC20SimpleSwap + abi abi.ABI address common.Address backend chain.Backend } @@ -87,7 +90,17 @@ func InstanceAt(address common.Address, backend chain.Backend) (Contract, error) if err != nil { return nil, err } - c := simpleContract{instance: instance, address: address, backend: backend} + + contractABI, err := abi.JSON(strings.NewReader(contract.ERC20SimpleSwapABI)) + if err != nil { + return nil, err + } + c := simpleContract{ + abi: contractABI, + instance: instance, + address: address, + backend: backend, + } return c, err } @@ -130,15 +143,20 @@ func (s simpleContract) Deposit(auth *bind.TransactOpts, amount *big.Int) (*type return chain.WaitMined(auth.Context, s.backend, tx.Hash()) } -// CashChequeBeneficiaryStart sends the transaction to cash a cheque as the beneficiary -func (s simpleContract) CashChequeBeneficiaryStart(opts *bind.TransactOpts, beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*types.Transaction, error) { +// CashChequeBeneficiaryRequest generates a TxRequest for a CashChequeBeneficiary transaction +func (s simpleContract) CashChequeBeneficiaryRequest(beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*chain.TxRequest, error) { payout := cumulativePayout.Value() - // send a copy of cumulativePayout to instance as it modifies the supplied big int internally - tx, err := s.instance.CashChequeBeneficiary(opts, beneficiary, big.NewInt(0).Set(&payout), ownerSig) + data, err := s.abi.Pack("cashChequeBeneficiary", beneficiary, big.NewInt(0).Set(&payout), ownerSig) if err != nil { return nil, err } - return tx, nil + + return &chain.TxRequest{ + To: s.address, + Value: big.NewInt(0), + GasLimit: 200000, + Data: data, + }, nil } // CashChequeBeneficiaryResult processes the receipt from a CashChequeBeneficiary transaction diff --git a/swap/cashout.go b/swap/cashout.go index 9d62250c64..5a136306e2 100644 --- a/swap/cashout.go +++ b/swap/cashout.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" contract "github.com/ethersphere/swarm/contracts/swap" "github.com/ethersphere/swarm/swap/chain" @@ -30,11 +31,8 @@ import ( // CashChequeBeneficiaryTransactionCost is the expected gas cost of a CashChequeBeneficiary transaction const CashChequeBeneficiaryTransactionCost = 50000 -// CashoutProcessor holds all relevant fields needed for processing cashouts -type CashoutProcessor struct { - backend chain.Backend // ethereum backend to use - privateKey *ecdsa.PrivateKey // private key to use -} +// CashoutRequestHandlerID is the handlerID used by the CashoutProcessor for CashoutRequests +const CashoutRequestHandlerID = "CashoutProcessor_CashoutRequest" // CashoutRequest represents a request for a cashout operation type CashoutRequest struct { @@ -42,42 +40,95 @@ type CashoutRequest struct { Destination common.Address // destination for the payout } -// ActiveCashout stores the necessary information for a cashout in progess -type ActiveCashout struct { - Request CashoutRequest // the request that caused this cashout - TransactionHash common.Hash // the hash of the current transaction for this request +// CashoutProcessor holds all relevant fields needed for processing cashouts +type CashoutProcessor struct { + backend chain.Backend // ethereum backend to use + txScheduler chain.TxScheduler // transaction queue to use + cashoutResultHandler CashoutResultHandler + cashoutDone chan *CashoutRequest +} + +// CashoutResultHandler is an interface which accepts CashChequeResults from a CashoutProcessor +type CashoutResultHandler interface { + // Called by the CashoutProcessor when a CashoutRequest was successfully executed + // It will be called again if an error is returned + HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error } // newCashoutProcessor creates a new instance of CashoutProcessor -func newCashoutProcessor(backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor { - return &CashoutProcessor{ - backend: backend, - privateKey: privateKey, +func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, privateKey *ecdsa.PrivateKey, cashoutResultHandler CashoutResultHandler) *CashoutProcessor { + c := &CashoutProcessor{ + backend: backend, + txScheduler: txScheduler, + cashoutResultHandler: cashoutResultHandler, } -} -// cashCheque tries to cash the cheque specified in the request -// after the transaction is sent it waits on its success -func (c *CashoutProcessor) cashCheque(ctx context.Context, request *CashoutRequest) error { - cheque := request.Cheque - opts := bind.NewKeyedTransactor(c.privateKey) - opts.Context = ctx + txScheduler.SetHandlers(CashoutRequestHandlerID, &chain.TxRequestHandlers{ + NotifyReceipt: func(ctx context.Context, id uint64, notification *chain.TxReceiptNotification) error { + var request *CashoutRequest + err := c.txScheduler.GetExtraData(id, &request) + if err != nil { + return err + } + + otherSwap, err := contract.InstanceAt(request.Cheque.Contract, c.backend) + if err != nil { + return err + } + + receipt := ¬ification.Receipt + if receipt.Status == 0 { + swapLog.Error("cheque cashing transaction reverted", "tx", receipt.TxHash) + return nil + } + + result := otherSwap.CashChequeBeneficiaryResult(receipt) + return c.cashoutResultHandler.HandleCashoutResult(request, result, receipt) + }, + }) + return c +} - otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend) +// submitCheque submits a cheque for cashout +// the cheque might not be cashed if it is not deemed profitable +func (c *CashoutProcessor) submitCheque(ctx context.Context, request *CashoutRequest) { + expectedPayout, transactionCosts, err := c.estimatePayout(ctx, &request.Cheque) if err != nil { - return err + swapLog.Error("could not estimate payout", "error", err) + return } - tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature) + costsMultiplier := uint256.FromUint64(2) + costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier) if err != nil { - return err + swapLog.Error("overflow in transaction fee", "error", err) + return } - // this blocks until the cashout has been successfully processed - return c.waitForAndProcessActiveCashout(&ActiveCashout{ - Request: *request, - TransactionHash: tx.Hash(), - }) + // do a payout transaction if we get 2 times the gas costs + if expectedPayout.Cmp(costThreshold) == 1 { + swapLog.Info("queueing cashout", "cheque", &request.Cheque) + + cheque := request.Cheque + otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend) + if err != nil { + swapLog.Error("could not get swap instance", "error", err) + return + } + + txRequest, err := otherSwap.CashChequeBeneficiaryRequest(cheque.Beneficiary, cheque.CumulativePayout, cheque.Signature) + if err != nil { + metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1) + swapLog.Error("cashing cheque:", "error", err) + return + } + + _, err = c.txScheduler.ScheduleRequest(CashoutRequestHandlerID, *txRequest, request) + if err != nil { + metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1) + swapLog.Error("cashing cheque:", "error", err) + } + } } // estimatePayout estimates the payout for a given cheque as well as the transaction cost @@ -123,31 +174,3 @@ func (c *CashoutProcessor) estimatePayout(ctx context.Context, cheque *Cheque) ( return expectedPayout, transactionCosts, nil } - -// waitForAndProcessActiveCashout waits for activeCashout to complete -func (c *CashoutProcessor) waitForAndProcessActiveCashout(activeCashout *ActiveCashout) error { - ctx, cancel := context.WithTimeout(context.Background(), DefaultTransactionTimeout) - defer cancel() - - receipt, err := chain.WaitMined(ctx, c.backend, activeCashout.TransactionHash) - if err != nil { - return err - } - - otherSwap, err := contract.InstanceAt(activeCashout.Request.Cheque.Contract, c.backend) - if err != nil { - return err - } - - result := otherSwap.CashChequeBeneficiaryResult(receipt) - - metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64()) - - if result.Bounced { - metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1) - swapLog.Warn("cheque bounced", "tx", receipt.TxHash) - } - - swapLog.Info("cheque cashed", "honey", activeCashout.Request.Cheque.Honey) - return nil -} diff --git a/swap/cashout_test.go b/swap/cashout_test.go index 18d56da4ce..7827ee1100 100644 --- a/swap/cashout_test.go +++ b/swap/cashout_test.go @@ -19,9 +19,10 @@ package swap import ( "context" "testing" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/log" + "github.com/ethersphere/swarm/state" "github.com/ethersphere/swarm/swap/chain" "github.com/ethersphere/swarm/uint256" ) @@ -33,11 +34,13 @@ import ( // afterwards it attempts to cash-in a bouncing cheque func TestContractIntegration(t *testing.T) { backend := newTestBackend(t) - reset := setupContractTest() - defer reset() + defer backend.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() payout := uint256.FromUint64(42) - chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout) + chequebook, err := testDeployWithPrivateKey(ctx, backend, ownerKey, ownerAddress, payout) if err != nil { t.Fatal(err) } @@ -49,20 +52,35 @@ func TestContractIntegration(t *testing.T) { opts := bind.NewKeyedTransactor(beneficiaryKey) - tx, err := chequebook.CashChequeBeneficiaryStart(opts, beneficiaryAddress, payout, cheque.Signature) + txRequest, err := chequebook.CashChequeBeneficiaryRequest(beneficiaryAddress, payout, cheque.Signature) if err != nil { t.Fatal(err) } - receipt, err := chain.WaitMined(nil, backend, tx.Hash()) + nonce, err := backend.PendingNonceAt(ctx, opts.From) + if err != nil { + t.Fatal(err) + } + + tx, err := txRequest.ToSignedTx(nonce, opts) + if err != nil { + t.Fatal(err) + } + + err = backend.SendTransaction(ctx, tx) + if err != nil { + t.Fatal(err) + } + + receipt, err := chain.WaitMined(ctx, backend, tx.Hash()) if err != nil { t.Fatal(err) } - cashResult := chequebook.CashChequeBeneficiaryResult(receipt) if receipt.Status != 1 { t.Fatalf("Bad status %d", receipt.Status) } + cashResult := chequebook.CashChequeBeneficiaryResult(receipt) if cashResult.Bounced { t.Fatal("cashing bounced") } @@ -80,7 +98,6 @@ func TestContractIntegration(t *testing.T) { if !cheque.CumulativePayout.Equals(paidOut) { t.Fatalf("Wrong cumulative payout %v", paidOut) } - log.Debug("cheques result", "result", result) // create a cheque that will bounce _, err = payout.Add(payout, uint256.FromUint64(10000*RetrieveRequestPrice)) @@ -93,7 +110,22 @@ func TestContractIntegration(t *testing.T) { t.Fatal(err) } - tx, err = chequebook.CashChequeBeneficiaryStart(opts, beneficiaryAddress, bouncingCheque.CumulativePayout, bouncingCheque.Signature) + txRequest, err = chequebook.CashChequeBeneficiaryRequest(beneficiaryAddress, bouncingCheque.CumulativePayout, bouncingCheque.Signature) + if err != nil { + t.Fatal(err) + } + + nonce, err = backend.PendingNonceAt(ctx, opts.From) + if err != nil { + t.Fatal(err) + } + + tx, err = txRequest.ToSignedTx(nonce, opts) + if err != nil { + t.Fatal(err) + } + + err = backend.SendTransaction(ctx, tx) if err != nil { t.Fatal(err) } @@ -113,14 +145,23 @@ func TestContractIntegration(t *testing.T) { } -// TestCashCheque creates a valid cheque and feeds it to cashoutProcessor.cashCheque +// TestCashCheque creates a valid cheque and feeds it to cashoutProcessor.submitCheque func TestCashCheque(t *testing.T) { backend := newTestBackend(t) - reset := setupContractTest() - defer reset() + defer backend.Close() - cashoutProcessor := newCashoutProcessor(backend, ownerKey) - payout := uint256.FromUint64(42) + store := state.NewInmemoryStore() + defer store.Close() + + transactionQueue := chain.NewTxQueue(store, "queue", &chain.DefaultTxSchedulerBackend{ + Backend: backend, + }, ownerKey) + transactionQueue.Start() + defer transactionQueue.Stop() + + cashoutHandler := newTestCashoutResultHandler(nil) + cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey, cashoutHandler) + payout := uint256.FromUint64(CashChequeBeneficiaryTransactionCost*2 + 1) chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout) if err != nil { @@ -132,12 +173,14 @@ func TestCashCheque(t *testing.T) { t.Fatal(err) } - err = cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{ + cashoutProcessor.submitCheque(context.Background(), &CashoutRequest{ Cheque: *testCheque, Destination: ownerAddress, }) - if err != nil { - t.Fatal(err) + + select { + case <-cashoutHandler.cashChequeDone: + case <-time.After(5 * time.Second): } paidOut, err := chequebook.PaidOut(nil, ownerAddress) @@ -154,12 +197,20 @@ func TestCashCheque(t *testing.T) { // TestEstimatePayout creates a valid cheque and feeds it to cashoutProcessor.estimatePayout func TestEstimatePayout(t *testing.T) { backend := newTestBackend(t) - reset := setupContractTest() - defer reset() + defer backend.Close() - cashoutProcessor := newCashoutProcessor(backend, ownerKey) - payout := uint256.FromUint64(42) + store := state.NewInmemoryStore() + defer store.Close() + + transactionQueue := chain.NewTxQueue(store, "queue", &chain.DefaultTxSchedulerBackend{ + Backend: backend, + }, ownerKey) + transactionQueue.Start() + defer transactionQueue.Stop() + cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey, &testCashoutResultHandler{}) + + payout := uint256.FromUint64(42) chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout) if err != nil { t.Fatal(err) diff --git a/swap/common_test.go b/swap/common_test.go index f7b7fbcce6..28cb3aed82 100644 --- a/swap/common_test.go +++ b/swap/common_test.go @@ -15,10 +15,12 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind/backends" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" contractFactory "github.com/ethersphere/go-sw3/contracts-v0-2-0/simpleswapfactory" + contract "github.com/ethersphere/swarm/contracts/swap" cswap "github.com/ethersphere/swarm/contracts/swap" "github.com/ethersphere/swarm/network" "github.com/ethersphere/swarm/p2p/protocols" @@ -34,8 +36,6 @@ type swapTestBackend struct { *mock.TestBackend factoryAddress common.Address // address of the SimpleSwapFactory in the simulated network tokenAddress common.Address // address of the token in the simulated network - // the async cashing go routine needs synchronization for tests - cashDone chan struct{} } var defaultBackend = backends.NewSimulatedBackend(core.GenesisAlloc{ @@ -67,7 +67,6 @@ func newTestBackend(t *testing.T) *swapTestBackend { TestBackend: backend, factoryAddress: factoryAddress, tokenAddress: tokenAddress, - cashDone: make(chan struct{}), } } @@ -105,7 +104,11 @@ func newBaseTestSwapWithParams(t *testing.T, key *ecdsa.PrivateKey, params *Para if err != nil { t.Fatal(err) } - swap := newSwapInstance(stateStore, owner, backend, 10, params, factory) + + txqueue := chain.NewTxQueue(stateStore, "chain", &chain.DefaultTxSchedulerBackend{ + Backend: backend, + }, owner.privateKey) + swap := newSwapInstance(stateStore, owner, backend, 10, params, factory, txqueue) return swap, dir } @@ -126,6 +129,7 @@ func newTestSwap(t *testing.T, key *ecdsa.PrivateKey, backend *swapTestBackend) usedBackend = newTestBackend(t) } swap, dir := newBaseTestSwap(t, key, usedBackend) + swap.txScheduler.Start() clean := func() { swap.Close() // only close if created by newTestSwap to avoid double close @@ -206,32 +210,6 @@ func newRandomTestCheque() *Cheque { return cheque } -// During tests, because the cashing in of cheques is async, we should wait for the function to be returned -// Otherwise if we call `handleEmitChequeMsg` manually, it will return before the TX has been committed to the `SimulatedBackend`, -// causing subsequent TX to possibly fail due to nonce mismatch -func testCashCheque(s *Swap, cheque *Cheque) { - cashCheque(s, cheque) - // send to the channel, signals to clients that this function actually finished - if stb, ok := s.backend.(*swapTestBackend); ok { - if stb.cashDone != nil { - stb.cashDone <- struct{}{} - } - } -} - -// setupContractTest is a helper function for setting up the -// blockchain wait function for testing -func setupContractTest() func() { - // we also need to store the previous cashCheque function in case this is called multiple times - currentCashCheque := defaultCashCheque - defaultCashCheque = testCashCheque - // overwrite only for the duration of the test, so... - return func() { - // ...we need to set it back to original when done - defaultCashCheque = currentCashCheque - } -} - // deploy for testing (needs simulated backend commit) func testDeployWithPrivateKey(ctx context.Context, backend chain.Backend, privateKey *ecdsa.PrivateKey, ownerAddress common.Address, depositAmount *uint256.Uint256) (cswap.Contract, error) { opts := bind.NewKeyedTransactor(privateKey) @@ -248,9 +226,6 @@ func testDeployWithPrivateKey(ctx context.Context, backend chain.Backend, privat return nil, err } - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() contract, err := factory.DeploySimpleSwap(opts, ownerAddress, big.NewInt(int64(defaultHarddepositTimeoutDuration))) if err != nil { return nil, err @@ -315,3 +290,45 @@ func (d *dummyMsgRW) ReadMsg() (p2p.Msg, error) { func (d *dummyMsgRW) WriteMsg(msg p2p.Msg) error { return nil } + +// struct used by the testCashoutResultHandler +type cashChequeDoneData struct { + request *CashoutRequest + result *contract.CashChequeResult + receipt *types.Receipt +} + +// testCashoutResultHandler is a CashoutResultHandler which writes to a channel after forwarding the result to swap +type testCashoutResultHandler struct { + swap *Swap + cashChequeDone chan cashChequeDoneData +} + +func newTestCashoutResultHandler(swap *Swap) *testCashoutResultHandler { + return &testCashoutResultHandler{ + swap: swap, + cashChequeDone: make(chan cashChequeDoneData), + } +} + +// HandleCashoutResult forwards the result to swap if set and afterwards sends it to its channel +func (h *testCashoutResultHandler) HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error { + if h.swap != nil { + if err := h.swap.HandleCashoutResult(request, result, receipt); err != nil { + return err + } + } + h.cashChequeDone <- cashChequeDoneData{ + request: request, + result: result, + receipt: receipt, + } + return nil +} + +// helper function to override the cashoutHandler for a cashout processor of swap instance +func overrideCashoutResultHandler(swap *Swap) *testCashoutResultHandler { + cashoutResultHandler := newTestCashoutResultHandler(swap) + swap.cashoutProcessor.cashoutResultHandler = cashoutResultHandler + return cashoutResultHandler +} diff --git a/swap/protocol_test.go b/swap/protocol_test.go index ca120ae9d6..56f2a59b99 100644 --- a/swap/protocol_test.go +++ b/swap/protocol_test.go @@ -49,7 +49,6 @@ type swapTester struct { swap *Swap } -// creates a new protocol tester for swap with a deployed chequebook func newSwapTester(t *testing.T, backend *swapTestBackend, depositAmount *uint256.Uint256) (*swapTester, func(), error) { swap, clean := newTestSwap(t, ownerKey, backend) @@ -229,14 +228,11 @@ func TestEmitCheque(t *testing.T) { t.Fatal(err) } creditorSwap := protocolTester.swap + cashoutHandler := overrideCashoutResultHandler(creditorSwap) debitorSwap, cleanDebitorSwap := newTestSwap(t, beneficiaryKey, testBackend) defer cleanDebitorSwap() - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() - log.Debug("deploy to simulated backend") // cashCheque cashes a cheque when the reward of doing so is twice the transaction costs. @@ -317,7 +313,7 @@ func TestEmitCheque(t *testing.T) { // we wait until the cashCheque is actually terminated (ensures proper nonce count) select { - case <-testBackend.cashDone: + case <-cashoutHandler.cashChequeDone: log.Debug("cash transaction completed and committed") case <-time.After(4 * time.Second): t.Fatalf("Timeout waiting for cash transaction to complete") @@ -340,9 +336,6 @@ func TestTriggerPaymentThreshold(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() if err = protocolTester.testHandshake( correctSwapHandshakeMsg(debitorSwap), diff --git a/swap/simulations_test.go b/swap/simulations_test.go index e2222da8b7..8aea683fa5 100644 --- a/swap/simulations_test.go +++ b/swap/simulations_test.go @@ -47,6 +47,7 @@ import ( "github.com/ethersphere/swarm/network/simulation" "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/state" + "github.com/ethersphere/swarm/swap/chain" mock "github.com/ethersphere/swarm/swap/chain/mock" "github.com/ethersphere/swarm/uint256" ) @@ -62,6 +63,7 @@ For integration tests, run test cluster deployments with all integration moduele (blockchains, oracles, etc.) */ // swapSimulationParams allows to avoid global variables for the test + type swapSimulationParams struct { swaps map[int]*Swap dirs map[int]string @@ -165,9 +167,10 @@ func newSimServiceMap(params *swapSimulationParams) map[string]simulation.Servic if err != nil { return nil, nil, err } + ts.swap.cashoutProcessor.txScheduler.Start() cleanup = func() { - ts.swap.store.Close() + ts.swap.Close() os.RemoveAll(dir) } @@ -238,7 +241,6 @@ func newSharedBackendSwaps(t *testing.T, nodeCount int) (*swapSimulationParams, TestBackend: mock.NewTestBackend(defaultBackend), factoryAddress: factoryAddress, tokenAddress: tokenAddress, - cashDone: make(chan struct{}), } // finally, create all Swap instances for each node, which share the same backend var owner *Owner @@ -249,7 +251,10 @@ func newSharedBackendSwaps(t *testing.T, nodeCount int) (*swapSimulationParams, if err != nil { t.Fatal(err) } - params.swaps[i] = newSwapInstance(stores[i], owner, testBackend, 10, defParams, factory) + txqueue := chain.NewTxQueue(stores[i], "chain", &chain.DefaultTxSchedulerBackend{ + Backend: testBackend, + }, owner.privateKey) + params.swaps[i] = newSwapInstance(stores[i], owner, testBackend, 10, defParams, factory, txqueue) } params.backend = testBackend @@ -271,10 +276,6 @@ func TestMultiChequeSimulation(t *testing.T) { // cleanup backend defer params.backend.Close() - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() - // initialize the simulation sim := simulation.NewBzzInProc(newSimServiceMap(params), false) defer sim.Close() @@ -306,6 +307,8 @@ func TestMultiChequeSimulation(t *testing.T) { // get the testService for the creditor creditorSvc := sim.Service("swap", creditor).(*testService) + cashoutHandler := overrideCashoutResultHandler(creditorSvc.swap) + var debLen, credLen, debSwapLen, credSwapLen int timeout := time.After(10 * time.Second) for { @@ -384,7 +387,7 @@ func TestMultiChequeSimulation(t *testing.T) { balanceAfterMessage := debitorBalance - int64(msgPrice) if balanceAfterMessage <= -paymentThreshold { // we need to wait a bit in order to give time for the cheque to be processed - if err := waitForChequeProcessed(t, params.backend, counter, lastCount, debitorSvc.swap.peers[creditor], expectedPayout); err != nil { + if err := waitForChequeProcessed(t, params.backend, counter, lastCount, debitorSvc.swap.peers[creditor], expectedPayout, cashoutHandler.cashChequeDone); err != nil { t.Fatal(err) } expectedPayout += uint64(-balanceAfterMessage) @@ -620,7 +623,7 @@ func TestBasicSwapSimulation(t *testing.T) { log.Info("Simulation ended") } -func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metrics.Counter, lastCount int64, p *Peer, expectedLastPayout uint64) error { +func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metrics.Counter, lastCount int64, p *Peer, expectedLastPayout uint64, cashChequeDone chan cashChequeDoneData) error { t.Helper() ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() @@ -643,7 +646,7 @@ func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metr lock.Unlock() wg.Done() return - case <-backend.cashDone: + case <-cashChequeDone: wg.Done() return } diff --git a/swap/swap.go b/swap/swap.go index 274c3a29cd..f638ee66e4 100644 --- a/swap/swap.go +++ b/swap/swap.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/console" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" @@ -68,6 +69,7 @@ type Swap struct { chequebookFactory contract.SimpleSwapFactory // the chequebook factory used honeyPriceOracle HoneyOracle // oracle which resolves the price of honey (in Wei) cashoutProcessor *CashoutProcessor // processor for cashing out + txScheduler chain.TxScheduler // transaction scheduler to use } // Owner encapsulates information related to accessing the contract @@ -136,8 +138,8 @@ func swapRotatingFileHandler(logdir string) (log.Handler, error) { } // newSwapInstance is a swap constructor function without integrity checks -func newSwapInstance(stateStore state.Store, owner *Owner, backend chain.Backend, chainID uint64, params *Params, chequebookFactory contract.SimpleSwapFactory) *Swap { - return &Swap{ +func newSwapInstance(stateStore state.Store, owner *Owner, backend chain.Backend, chainID uint64, params *Params, chequebookFactory contract.SimpleSwapFactory, txScheduler chain.TxScheduler) *Swap { + s := &Swap{ store: stateStore, peers: make(map[enode.ID]*Peer), backend: backend, @@ -146,8 +148,10 @@ func newSwapInstance(stateStore state.Store, owner *Owner, backend chain.Backend chequebookFactory: chequebookFactory, honeyPriceOracle: NewHoneyPriceOracle(), chainID: chainID, - cashoutProcessor: newCashoutProcessor(backend, owner.privateKey), + txScheduler: txScheduler, } + s.cashoutProcessor = newCashoutProcessor(txScheduler, backend, owner.privateKey, s) + return s } // New prepares and creates all fields to create a swap instance: @@ -209,12 +213,17 @@ func New(dbPath string, prvkey *ecdsa.PrivateKey, backendURL string, params *Par chainID.Uint64(), params, factory, + chain.NewTxQueue(stateStore, "chain", &chain.DefaultTxSchedulerBackend{ + Backend: backend, + }, owner.privateKey), ) // start the chequebook if swap.contract, err = swap.StartChequebook(chequebookAddressFlag); err != nil { return nil, err } + swap.txScheduler.Start() + // deposit money in the chequebook if desired if !skipDepositFlag { // prompt the user for a depositAmount @@ -391,8 +400,6 @@ func (s *Swap) handleMsg(p *Peer) func(ctx context.Context, msg interface{}) err } } -var defaultCashCheque = cashCheque - // handleEmitChequeMsg should be handled by the creditor when it receives // a cheque from a debitor func (s *Swap) handleEmitChequeMsg(ctx context.Context, p *Peer, msg *EmitChequeMsg) error { @@ -435,21 +442,10 @@ func (s *Swap) handleEmitChequeMsg(ctx context.Context, p *Peer, msg *EmitCheque return protocols.Break(err) } - expectedPayout, transactionCosts, err := s.cashoutProcessor.estimatePayout(context.TODO(), cheque) - if err != nil { - return protocols.Break(err) - } - - costsMultiplier := uint256.FromUint64(2) - costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier) - if err != nil { - return err - } - - // do a payout transaction if we get 2 times the gas costs - if expectedPayout.Cmp(costThreshold) == 1 { - go defaultCashCheque(s, cheque) - } + s.cashoutProcessor.submitCheque(ctx, &CashoutRequest{ + Cheque: *cheque, + Destination: s.GetParams().ContractAddress, + }) return nil } @@ -489,20 +485,6 @@ func (s *Swap) handleConfirmChequeMsg(ctx context.Context, p *Peer, msg *Confirm return nil } -// cashCheque should be called async as it blocks until the transaction(s) are mined -// The function cashes the cheque by sending it to the blockchain -func cashCheque(s *Swap, cheque *Cheque) { - err := s.cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{ - Cheque: *cheque, - Destination: s.GetParams().ContractAddress, - }) - - if err != nil { - metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1) - swapLog.Error("cashing cheque:", "error", err) - } -} - // processAndVerifyCheque verifies the cheque and compares it with the last received cheque // if the cheque is valid it will also be saved as the new last cheque // the caller is expected to hold p.lock @@ -613,6 +595,7 @@ func (s *Swap) saveBalance(p enode.ID, balance int64) error { // Close cleans up swap func (s *Swap) Close() error { + s.txScheduler.Stop() return s.store.Close() } @@ -737,3 +720,15 @@ func (s *Swap) loadChequebook() (common.Address, error) { func (s *Swap) saveChequebook(chequebook common.Address) error { return s.store.Put(connectedChequebookKey, chequebook) } + +func (s *Swap) HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error { + metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64()) + + if result.Bounced { + metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1) + swapLog.Warn("cheque bounced", "tx", receipt.TxHash) + } + + swapLog.Info("cheque cashed", "cheque", &request.Cheque) + return nil +} diff --git a/swap/swap_test.go b/swap/swap_test.go index 01bf17d142..6f5fa94104 100644 --- a/swap/swap_test.go +++ b/swap/swap_test.go @@ -603,6 +603,7 @@ func TestResetBalance(t *testing.T) { defer testBackend.Close() // create both test swap accounts creditorSwap, clean1 := newTestSwap(t, beneficiaryKey, testBackend) + cashoutHandler := overrideCashoutResultHandler(creditorSwap) debitorSwap, clean2 := newTestSwap(t, ownerKey, testBackend) defer clean1() defer clean2() @@ -641,10 +642,6 @@ func TestResetBalance(t *testing.T) { t.Fatal(err) } - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() - // now simulate sending the cheque to the creditor from the debitor if err = creditor.sendCheque(); err != nil { t.Fatal(err) @@ -663,20 +660,18 @@ func TestResetBalance(t *testing.T) { if cheque == nil { t.Fatal("expected to find a cheque, but it was empty") } - // ...create a message... - msg := &EmitChequeMsg{ - Cheque: cheque, - } // ...and trigger message handling on the receiver side (creditor) // remember that debitor is the model of the remote node for the creditor... - err = creditorSwap.handleEmitChequeMsg(ctx, debitor, msg) + err = creditorSwap.handleEmitChequeMsg(ctx, debitor, &EmitChequeMsg{ + Cheque: cheque, + }) if err != nil { t.Fatal(err) } // ...on which we wait until the cashCheque is actually terminated (ensures proper nonce count) select { - case <-testBackend.cashDone: + case <-cashoutHandler.cashChequeDone: log.Debug("cash transaction completed and committed") case <-time.After(4 * time.Second): t.Fatalf("Timeout waiting for cash transactions to complete") @@ -692,8 +687,6 @@ func TestResetBalance(t *testing.T) { func TestDebtCheques(t *testing.T) { testBackend := newTestBackend(t) defer testBackend.Close() - cleanup := setupContractTest() - defer cleanup() creditorSwap, cleanup := newTestSwap(t, beneficiaryKey, testBackend) defer cleanup() @@ -745,14 +738,6 @@ func TestDebtCheques(t *testing.T) { if err != nil { t.Fatal(err) } - - // ...on which we wait until the cashCheque is actually terminated (ensures proper nonce count) - select { - case <-testBackend.cashDone: - log.Debug("cash transaction completed and committed") - case <-time.After(4 * time.Second): - t.Fatalf("Timeout waiting for cash transactions to complete") - } } // generate bookings based on parameters, apply them to a Swap struct and verify the result @@ -1303,10 +1288,6 @@ func TestSwapLogToFile(t *testing.T) { t.Fatal(err) } - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() - // now simulate sending the cheque to the creditor from the debitor if err = creditor.sendCheque(); err != nil { t.Fatal(err) @@ -1360,8 +1341,6 @@ func TestAvailableBalance(t *testing.T) { defer testBackend.Close() swap, clean := newTestSwap(t, ownerKey, testBackend) defer clean() - cleanup := setupContractTest() - defer cleanup() depositAmount := uint256.FromUint64(9000 * RetrieveRequestPrice) From f6cdcab09dcf71e90b8739fde2dbdc365cfdd00c Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Fri, 6 Mar 2020 09:09:22 +0100 Subject: [PATCH 04/18] swap, swap/chain, contracts/swap: address pr comments --- contracts/swap/swap.go | 6 +++--- swap/cashout.go | 3 +-- swap/cashout_test.go | 1 + swap/chain/persistentqueue.go | 4 ++-- swap/chain/txqueue.go | 8 ++++---- swap/chain/txscheduler.go | 4 +++- swap/swap.go | 1 + 7 files changed, 15 insertions(+), 12 deletions(-) diff --git a/contracts/swap/swap.go b/contracts/swap/swap.go index 87eb9ef874..ec3fe845e0 100644 --- a/contracts/swap/swap.go +++ b/contracts/swap/swap.go @@ -83,7 +83,7 @@ type simpleContract struct { } // InstanceAt creates a new instance of a contract at a specific address. -// It assumes that there is an existing contract instance at the given address, or an error is returned +// It assumes that there is an existing contract instance at the given address // This function is needed to communicate with remote Swap contracts (e.g. sending a cheque) func InstanceAt(address common.Address, backend chain.Backend) (Contract, error) { instance, err := contract.NewERC20SimpleSwap(address, backend) @@ -146,7 +146,7 @@ func (s simpleContract) Deposit(auth *bind.TransactOpts, amount *big.Int) (*type // CashChequeBeneficiaryRequest generates a TxRequest for a CashChequeBeneficiary transaction func (s simpleContract) CashChequeBeneficiaryRequest(beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*chain.TxRequest, error) { payout := cumulativePayout.Value() - data, err := s.abi.Pack("cashChequeBeneficiary", beneficiary, big.NewInt(0).Set(&payout), ownerSig) + callData, err := s.abi.Pack("cashChequeBeneficiary", beneficiary, big.NewInt(0).Set(&payout), ownerSig) if err != nil { return nil, err } @@ -155,7 +155,7 @@ func (s simpleContract) CashChequeBeneficiaryRequest(beneficiary common.Address, To: s.address, Value: big.NewInt(0), GasLimit: 200000, - Data: data, + Data: callData, }, nil } diff --git a/swap/cashout.go b/swap/cashout.go index 5a136306e2..079a6eb802 100644 --- a/swap/cashout.go +++ b/swap/cashout.go @@ -45,7 +45,6 @@ type CashoutProcessor struct { backend chain.Backend // ethereum backend to use txScheduler chain.TxScheduler // transaction queue to use cashoutResultHandler CashoutResultHandler - cashoutDone chan *CashoutRequest } // CashoutResultHandler is an interface which accepts CashChequeResults from a CashoutProcessor @@ -105,7 +104,7 @@ func (c *CashoutProcessor) submitCheque(ctx context.Context, request *CashoutReq return } - // do a payout transaction if we get 2 times the gas costs + // do a payout transaction if we get more than 2 times the gas costs if expectedPayout.Cmp(costThreshold) == 1 { swapLog.Info("queueing cashout", "cheque", &request.Cheque) diff --git a/swap/cashout_test.go b/swap/cashout_test.go index 7827ee1100..71528f3862 100644 --- a/swap/cashout_test.go +++ b/swap/cashout_test.go @@ -181,6 +181,7 @@ func TestCashCheque(t *testing.T) { select { case <-cashoutHandler.cashChequeDone: case <-time.After(5 * time.Second): + t.Fatal("cheque was not cashed within timeout") } paidOut, err := chequebook.PaidOut(nil, ownerAddress) diff --git a/swap/chain/persistentqueue.go b/swap/chain/persistentqueue.go index e6695f2f19..583967ed5c 100644 --- a/swap/chain/persistentqueue.go +++ b/swap/chain/persistentqueue.go @@ -14,8 +14,8 @@ import ( /* PersistentQueue represents a queue stored in a state store - Items are enqueued by writing them to the state store with the timestamp as prefix and a nonce in case two items get queued at the same time - It provides a (blocking) Next function to wait for a new item to be available. Only a single call to Next may be active at any time + Items are enqueued by writing them to the state store with the timestamp as prefix and a nonce so that two items can be queued at the same time + It provides a (blocking) Next function to wait for a new item to be available. Only a single call to Next may be active at any time To allow atomic operations with other state store operations all functions only write to batches instead of writing to the store directly The user must ensure that all functions (except Next) are called with the same lock held which is provided externally so multiple queues can use the same The queue provides no dequeue function. Instead an item must be deleted by its key diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go index f8dac5585a..23d4ec7516 100644 --- a/swap/chain/txqueue.go +++ b/swap/chain/txqueue.go @@ -196,7 +196,7 @@ func (txq *TxQueue) Start() { go func() { defer txq.wg.Done() // run the actual loop - err := txq.loop() + err := txq.processQueue() if err != nil && !errors.Is(err, context.Canceled) { log.Error("transaction queue terminated with an error", "queue", txq.prefix, "error", err) } @@ -357,7 +357,7 @@ func (txq *TxQueue) notify(batch *state.StoreBatch, id uint64, handlerID string, return triggerNotifyQueue, nil } -// waitForNextRequestLegacy waits for the next request and sets it as the active request +// waitForNextRequest waits for the next request and sets it as the active request // the txqueue lock must not be held func (txq *TxQueue) waitForNextRequest() (requestMetadata *txRequestData, err error) { var id uint64 @@ -610,10 +610,10 @@ func (txq *TxQueue) waitForActiveTransaction(requestMetadata *txRequestData) err return txq.finalizeRequestConfirmed(requestMetadata, *receipt) } -// loop is the main transaction processing function of the TxQueue +// processQueue is the main transaction processing function of the TxQueue // first it checks if there already is an active request. If so it processes this first // then it will take requests from the queue in a loop and execute those -func (txq *TxQueue) loop() error { +func (txq *TxQueue) processQueue() error { err := txq.processActiveRequest() if err != nil { return err diff --git a/swap/chain/txscheduler.go b/swap/chain/txscheduler.go index 4030b705a6..f2b1bdaa9e 100644 --- a/swap/chain/txscheduler.go +++ b/swap/chain/txscheduler.go @@ -54,6 +54,7 @@ func (request *TxRequest) ToSignedTx(nonce uint64, opts *bind.TransactOpts) (*ty // TxScheduler represents a central sender for all transactions from a single ethereum account // its purpose is to ensure there are no nonce issues and that transaction initiators are notified of the result // notifications are guaranteed to happen even across node restarts and disconnects from the ethereum backend +// the account managed by this scheduler must not be used from anywhere else type TxScheduler interface { // SetHandlers registers the handlers for the given handlerID // This starts the delivery of notifications for this handlerID @@ -61,7 +62,7 @@ type TxScheduler interface { // ScheduleRequest adds a new request to be processed // The request is assigned an id which is returned ScheduleRequest(handlerID string, request TxRequest, requestExtraData interface{}) (id uint64, err error) - // GetExtraData load the serialized extra data for this request from disk and tries to decode it + // GetExtraData loads the serialized extra data for this request from disk and tries to decode it GetExtraData(id uint64, request interface{}) error // GetRequestState gets the state the request is currently in GetRequestState(id uint64) (TxRequestState, error) @@ -79,6 +80,7 @@ type TxScheduler interface { // If the handler returns an error the notification will be resent in the future (including across restarts) type TxRequestHandlers struct { // NotifyReceipt is called the first time a receipt is observed for a transaction + // This happens the first time a transaction was included in a block NotifyReceipt func(ctx context.Context, id uint64, notification *TxReceiptNotification) error // NotifyPending is called after the transaction was successfully sent to the backend NotifyPending func(ctx context.Context, id uint64, notification *TxPendingNotification) error diff --git a/swap/swap.go b/swap/swap.go index f638ee66e4..c56848a61e 100644 --- a/swap/swap.go +++ b/swap/swap.go @@ -721,6 +721,7 @@ func (s *Swap) saveChequebook(chequebook common.Address) error { return s.store.Put(connectedChequebookKey, chequebook) } +// HandleCashoutResult is the handler function called by the CashoutProcessor in cae of a successful cashing transaction func (s *Swap) HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error { metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64()) From 97ce00bb1fc6f0ae53d10aea83b5aa15b8ddd871 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Fri, 6 Mar 2020 09:12:48 +0100 Subject: [PATCH 05/18] swap: fix a spelling mistake --- swap/swap.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swap/swap.go b/swap/swap.go index c56848a61e..1fe1433578 100644 --- a/swap/swap.go +++ b/swap/swap.go @@ -721,7 +721,7 @@ func (s *Swap) saveChequebook(chequebook common.Address) error { return s.store.Put(connectedChequebookKey, chequebook) } -// HandleCashoutResult is the handler function called by the CashoutProcessor in cae of a successful cashing transaction +// HandleCashoutResult is the handler function called by the CashoutProcessor in case of a successful cashing transaction func (s *Swap) HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error { metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64()) From 8a3d48c1405c755f68bd0f455a977c3f7c0b7e5b Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Fri, 6 Mar 2020 21:45:36 +0100 Subject: [PATCH 06/18] swap: don't export logger in cashout processor --- swap/cashout.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/swap/cashout.go b/swap/cashout.go index 317c1f2af9..d6fc715f3e 100644 --- a/swap/cashout.go +++ b/swap/cashout.go @@ -46,7 +46,7 @@ type CashoutProcessor struct { backend chain.Backend // ethereum backend to use txScheduler chain.TxScheduler // transaction queue to use cashoutResultHandler CashoutResultHandler - Logger Logger + logger Logger } // CashoutResultHandler is an interface which accepts CashChequeResults from a CashoutProcessor @@ -62,7 +62,7 @@ func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, p backend: backend, txScheduler: txScheduler, cashoutResultHandler: cashoutResultHandler, - Logger: logger, + logger: logger, } txScheduler.SetHandlers(CashoutRequestHandlerID, &chain.TxRequestHandlers{ @@ -80,7 +80,7 @@ func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, p receipt := ¬ification.Receipt if receipt.Status == 0 { - c.Logger.Error("cheque cashing transaction reverted", "tx", receipt.TxHash) + c.logger.Error("cheque cashing transaction reverted", "tx", receipt.TxHash) return nil } @@ -96,39 +96,39 @@ func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, p func (c *CashoutProcessor) submitCheque(ctx context.Context, request *CashoutRequest) { expectedPayout, transactionCosts, err := c.estimatePayout(ctx, &request.Cheque) if err != nil { - c.Logger.Error(CashChequeAction, "could not estimate payout", "error", err) + c.logger.Error(CashChequeAction, "could not estimate payout", "error", err) return } costsMultiplier := uint256.FromUint64(2) costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier) if err != nil { - c.Logger.Error(CashChequeAction, "overflow in transaction fee", "error", err) + c.logger.Error(CashChequeAction, "overflow in transaction fee", "error", err) return } // do a payout transaction if we get more than 2 times the gas costs if expectedPayout.Cmp(costThreshold) == 1 { - c.Logger.Info(CashChequeAction, "queueing cashout", "cheque", &request.Cheque) + c.logger.Info(CashChequeAction, "queueing cashout", "cheque", &request.Cheque) cheque := request.Cheque otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend) if err != nil { - c.Logger.Error(CashChequeAction, "could not get swap instance", "error", err) + c.logger.Error(CashChequeAction, "could not get swap instance", "error", err) return } txRequest, err := otherSwap.CashChequeBeneficiaryRequest(cheque.Beneficiary, cheque.CumulativePayout, cheque.Signature) if err != nil { metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1) - c.Logger.Error(CashChequeAction, "cashing cheque:", "error", err) + c.logger.Error(CashChequeAction, "cashing cheque:", "error", err) return } _, err = c.txScheduler.ScheduleRequest(CashoutRequestHandlerID, *txRequest, request) if err != nil { metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1) - c.Logger.Error(CashChequeAction, "cashing cheque:", "error", err) + c.logger.Error(CashChequeAction, "cashing cheque:", "error", err) } } } From 47ee8953a12f58fb366f6fd88d70625435d41a24 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Sat, 7 Mar 2020 12:50:54 +0100 Subject: [PATCH 07/18] swap/chain: don't export persistentQueue type --- swap/chain/persistentqueue.go | 26 +++++++++++++------------- swap/chain/persistentqueue_test.go | 8 ++++---- swap/chain/txqueue.go | 24 ++++++++++++------------ 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/swap/chain/persistentqueue.go b/swap/chain/persistentqueue.go index 583967ed5c..28ca8d9be6 100644 --- a/swap/chain/persistentqueue.go +++ b/swap/chain/persistentqueue.go @@ -13,7 +13,7 @@ import ( ) /* - PersistentQueue represents a queue stored in a state store + persistentQueue represents a queue stored in a state store Items are enqueued by writing them to the state store with the timestamp as prefix and a nonce so that two items can be queued at the same time It provides a (blocking) Next function to wait for a new item to be available. Only a single call to Next may be active at any time To allow atomic operations with other state store operations all functions only write to batches instead of writing to the store directly @@ -21,8 +21,8 @@ import ( The queue provides no dequeue function. Instead an item must be deleted by its key */ -// PersistentQueue represents a queue stored in a state store -type PersistentQueue struct { +// persistentQueue represents a queue stored in a state store +type persistentQueue struct { store state.Store // the store backing this queue prefix string // the prefix for the keys for this queue trigger chan struct{} // channel to notify the queue that a new item is available @@ -30,8 +30,8 @@ type PersistentQueue struct { } // NewPersistentQueue creates a structure to interact with a queue with the given prefix -func NewPersistentQueue(store state.Store, prefix string) *PersistentQueue { - return &PersistentQueue{ +func newPersistentQueue(store state.Store, prefix string) *persistentQueue { + return &persistentQueue{ store: store, prefix: prefix, trigger: make(chan struct{}, 1), @@ -39,11 +39,11 @@ func NewPersistentQueue(store state.Store, prefix string) *PersistentQueue { } } -// Queue puts the necessary database operations for enqueueing a new item into the supplied batch +// queue puts the necessary database operations for enqueueing a new item into the supplied batch // It returns the generated key and a trigger function which must be called once the batch was successfully written // This only returns an error if the encoding fails which is an unrecoverable error // A lock must be held and kept until after the trigger function was called or the batch write failed -func (pq *PersistentQueue) Queue(b *state.StoreBatch, v interface{}) (key string, trigger func(), err error) { +func (pq *persistentQueue) queue(b *state.StoreBatch, v interface{}) (key string, trigger func(), err error) { // the nonce guarantees keys don't collide if multiple transactions are queued in the same second pq.nonce++ key = fmt.Sprintf("%d_%08d", time.Now().Unix(), pq.nonce) @@ -59,10 +59,10 @@ func (pq *PersistentQueue) Queue(b *state.StoreBatch, v interface{}) (key string }, nil } -// Peek looks at the next item in the queue +// peek looks at the next item in the queue // The error returned is either a decode or an io error // A lock must be held when this is called and should be held afterwards to prevent the item from being removed while processing -func (pq *PersistentQueue) Peek(i interface{}) (key string, exists bool, err error) { +func (pq *persistentQueue) peek(i interface{}) (key string, exists bool, err error) { err = pq.store.Iterate(pq.prefix, func(k, data []byte) (bool, error) { key = string(k) unmarshaler, ok := i.(encoding.BinaryUnmarshaler) @@ -85,9 +85,9 @@ func (pq *PersistentQueue) Peek(i interface{}) (key string, exists bool, err err // No lock should not be held when this is called. Only a single call to next may be active at any time // If the the key is not "", the value exists, the supplied lock was acquired and must be released by the caller after processing the item // The supplied lock should be the same that is used for the other functions -func (pq *PersistentQueue) Next(ctx context.Context, i interface{}, lock *sync.Mutex) (key string, err error) { +func (pq *persistentQueue) next(ctx context.Context, i interface{}, lock *sync.Mutex) (key string, err error) { lock.Lock() - key, exists, err := pq.Peek(i) + key, exists, err := pq.peek(i) if exists { return key, nil } @@ -100,7 +100,7 @@ func (pq *PersistentQueue) Next(ctx context.Context, i interface{}, lock *sync.M select { case <-pq.trigger: lock.Lock() - key, exists, err = pq.Peek(i) + key, exists, err = pq.peek(i) if exists { return key, nil } @@ -116,6 +116,6 @@ func (pq *PersistentQueue) Next(ctx context.Context, i interface{}, lock *sync.M // Delete adds the batch operation to delete the queue element with the given key // A lock must be held when the batch is written -func (pq *PersistentQueue) Delete(b *state.StoreBatch, key string) { +func (pq *persistentQueue) delete(b *state.StoreBatch, key string) { b.Delete(pq.prefix + key) } diff --git a/swap/chain/persistentqueue_test.go b/swap/chain/persistentqueue_test.go index b866175a1c..feb801ff5c 100644 --- a/swap/chain/persistentqueue_test.go +++ b/swap/chain/persistentqueue_test.go @@ -16,7 +16,7 @@ func TestNewPersistentQueue(t *testing.T) { store := state.NewInmemoryStore() defer store.Close() - queue := NewPersistentQueue(store, "testq") + queue := newPersistentQueue(store, "testq") ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -32,7 +32,7 @@ func TestNewPersistentQueue(t *testing.T) { for i := 0; i < count; i++ { func() { // this is a function so we can use defer with the right scope var value uint64 - key, err := queue.Next(ctx, &value, &lock) + key, err := queue.next(ctx, &value, &lock) if err != nil { errout = fmt.Errorf("failed to get next item: %v", err) return @@ -50,7 +50,7 @@ func TestNewPersistentQueue(t *testing.T) { } batch := new(state.StoreBatch) - queue.Delete(batch, key) + queue.delete(batch, key) err = store.WriteBatch(batch) if err != nil { errout = fmt.Errorf("could not write batch: %v", err) @@ -69,7 +69,7 @@ func TestNewPersistentQueue(t *testing.T) { var value = uint64(i) batch := new(state.StoreBatch) - _, trigger, err := queue.Queue(batch, value) + _, trigger, err := queue.queue(batch, value) if err != nil { errout = fmt.Errorf("failed to queue item: %v", err) return diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go index 23d4ec7516..a801d66a47 100644 --- a/swap/chain/txqueue.go +++ b/swap/chain/txqueue.go @@ -29,9 +29,9 @@ type TxQueue struct { store state.Store // state store to use as the db backend prefix string // all keys in the state store are prefixed with this - requestQueue *PersistentQueue // queue for all future requests + requestQueue *persistentQueue // queue for all future requests handlers map[string]*TxRequestHandlers // map from handlerIDs to their registered handlers - notificationQueues map[string]*PersistentQueue // map from handlerIDs to the notification queue of that handler + notificationQueues map[string]*persistentQueue // map from handlerIDs to the notification queue of that handler backend TxSchedulerBackend // ethereum backend to use privateKey *ecdsa.PrivateKey // private key used to sign transactions @@ -67,10 +67,10 @@ func NewTxQueue(store state.Store, prefix string, backend TxSchedulerBackend, pr store: store, prefix: prefix, handlers: make(map[string]*TxRequestHandlers), - notificationQueues: make(map[string]*PersistentQueue), + notificationQueues: make(map[string]*persistentQueue), backend: backend, privateKey: privateKey, - requestQueue: NewPersistentQueue(store, prefix+"_requestQueue_"), + requestQueue: newPersistentQueue(store, prefix+"_requestQueue_"), errorChan: make(chan error, 1), } // we create the context here already because handlers can be set before the queue starts @@ -152,7 +152,7 @@ func (txq *TxQueue) ScheduleRequest(handlerID string, request TxRequest, extraDa return 0, err } - _, triggerQueue, err := txq.requestQueue.Queue(batch, id) + _, triggerQueue, err := txq.requestQueue.queue(batch, id) if err != nil { return 0, err } @@ -234,10 +234,10 @@ func (txq *TxQueue) Stop() { // getNotificationQueue gets the notification queue for a handler // it initializes the struct if it does not yet exist // the TxQueue lock must be held -func (txq *TxQueue) getNotificationQueue(handlerID string) *PersistentQueue { +func (txq *TxQueue) getNotificationQueue(handlerID string) *persistentQueue { queue, ok := txq.notificationQueues[handlerID] if !ok { - queue = NewPersistentQueue(txq.store, fmt.Sprintf("%s_notify_%s", txq.prefix, handlerID)) + queue = newPersistentQueue(txq.store, fmt.Sprintf("%s_notify_%s", txq.prefix, handlerID)) txq.notificationQueues[handlerID] = queue } return queue @@ -263,7 +263,7 @@ func (txq *TxQueue) SetHandlers(handlerID string, handlers *TxRequestHandlers) e for { var item notificationQueueItem // get the next notification item - key, err := notifyQueue.Next(txq.ctx, &item, &txq.lock) + key, err := notifyQueue.next(txq.ctx, &item, &txq.lock) if err != nil { if !errors.Is(err, context.Canceled) { txq.stopWithError(fmt.Errorf("could not read from notification queue: %v", err)) @@ -325,7 +325,7 @@ func (txq *TxQueue) SetHandlers(handlerID string, handlers *TxRequestHandlers) e // once the notification was handled delete it from the queue txq.lock.Lock() batch := new(state.StoreBatch) - notifyQueue.Delete(batch, key) + notifyQueue.delete(batch, key) err = txq.store.WriteBatch(batch) txq.lock.Unlock() if err != nil { @@ -342,7 +342,7 @@ func (txq *TxQueue) SetHandlers(handlerID string, handlers *TxRequestHandlers) e // must be called with the txqueue lock held func (txq *TxQueue) notify(batch *state.StoreBatch, id uint64, handlerID string, notificationType string, notification interface{}) (triggerNotifyQueue func(), err error) { notifyQueue := txq.getNotificationQueue(handlerID) - key, triggerNotifyQueue, err := notifyQueue.Queue(batch, ¬ificationQueueItem{ + key, triggerNotifyQueue, err := notifyQueue.queue(batch, ¬ificationQueueItem{ RequestID: id, NotificationType: notificationType, }) @@ -362,7 +362,7 @@ func (txq *TxQueue) notify(batch *state.StoreBatch, id uint64, handlerID string, func (txq *TxQueue) waitForNextRequest() (requestMetadata *txRequestData, err error) { var id uint64 // get the id of the next request in the queue - key, err := txq.requestQueue.Next(txq.ctx, &id, &txq.lock) + key, err := txq.requestQueue.next(txq.ctx, &id, &txq.lock) if err != nil { return nil, err } @@ -379,7 +379,7 @@ func (txq *TxQueue) waitForNextRequest() (requestMetadata *txRequestData, err er if err != nil { return nil, fmt.Errorf("could not put id write into batch: %v", err) } - txq.requestQueue.Delete(batch, key) + txq.requestQueue.delete(batch, key) err = txq.store.WriteBatch(batch) if err != nil { From 7486071505ca8f8d3fb3179990cd8470780620dc Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Tue, 10 Mar 2020 17:25:09 +0100 Subject: [PATCH 08/18] swap/chain: rename queue function to enqueue --- swap/chain/persistentqueue.go | 4 ++-- swap/chain/persistentqueue_test.go | 2 +- swap/chain/txqueue.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/swap/chain/persistentqueue.go b/swap/chain/persistentqueue.go index 28ca8d9be6..94a1940da6 100644 --- a/swap/chain/persistentqueue.go +++ b/swap/chain/persistentqueue.go @@ -39,11 +39,11 @@ func newPersistentQueue(store state.Store, prefix string) *persistentQueue { } } -// queue puts the necessary database operations for enqueueing a new item into the supplied batch +// enqueue puts the necessary database operations for enqueueing a new item into the supplied batch // It returns the generated key and a trigger function which must be called once the batch was successfully written // This only returns an error if the encoding fails which is an unrecoverable error // A lock must be held and kept until after the trigger function was called or the batch write failed -func (pq *persistentQueue) queue(b *state.StoreBatch, v interface{}) (key string, trigger func(), err error) { +func (pq *persistentQueue) enqueue(b *state.StoreBatch, v interface{}) (key string, trigger func(), err error) { // the nonce guarantees keys don't collide if multiple transactions are queued in the same second pq.nonce++ key = fmt.Sprintf("%d_%08d", time.Now().Unix(), pq.nonce) diff --git a/swap/chain/persistentqueue_test.go b/swap/chain/persistentqueue_test.go index feb801ff5c..366c0d1a5d 100644 --- a/swap/chain/persistentqueue_test.go +++ b/swap/chain/persistentqueue_test.go @@ -69,7 +69,7 @@ func TestNewPersistentQueue(t *testing.T) { var value = uint64(i) batch := new(state.StoreBatch) - _, trigger, err := queue.queue(batch, value) + _, trigger, err := queue.enqueue(batch, value) if err != nil { errout = fmt.Errorf("failed to queue item: %v", err) return diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go index a801d66a47..7fdc5f7717 100644 --- a/swap/chain/txqueue.go +++ b/swap/chain/txqueue.go @@ -152,7 +152,7 @@ func (txq *TxQueue) ScheduleRequest(handlerID string, request TxRequest, extraDa return 0, err } - _, triggerQueue, err := txq.requestQueue.queue(batch, id) + _, triggerQueue, err := txq.requestQueue.enqueue(batch, id) if err != nil { return 0, err } @@ -342,7 +342,7 @@ func (txq *TxQueue) SetHandlers(handlerID string, handlers *TxRequestHandlers) e // must be called with the txqueue lock held func (txq *TxQueue) notify(batch *state.StoreBatch, id uint64, handlerID string, notificationType string, notification interface{}) (triggerNotifyQueue func(), err error) { notifyQueue := txq.getNotificationQueue(handlerID) - key, triggerNotifyQueue, err := notifyQueue.queue(batch, ¬ificationQueueItem{ + key, triggerNotifyQueue, err := notifyQueue.enqueue(batch, ¬ificationQueueItem{ RequestID: id, NotificationType: notificationType, }) From 255927df629b8d376382fa7acb8ca8de9f8bffa5 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Tue, 10 Mar 2020 17:30:35 +0100 Subject: [PATCH 09/18] swap/chain: add copyright headers --- swap/chain/backend.go | 15 +++++++++++++++ swap/chain/common_test.go | 15 +++++++++++++++ swap/chain/mock/testbackend.go | 15 +++++++++++++++ swap/chain/persistentqueue.go | 15 +++++++++++++++ swap/chain/persistentqueue_test.go | 15 +++++++++++++++ swap/chain/txqueue.go | 15 +++++++++++++++ swap/chain/txqueue_test.go | 15 +++++++++++++++ swap/chain/txscheduler.go | 15 +++++++++++++++ 8 files changed, 120 insertions(+) diff --git a/swap/chain/backend.go b/swap/chain/backend.go index 1efc4d13f7..e78ac82ccb 100644 --- a/swap/chain/backend.go +++ b/swap/chain/backend.go @@ -1,3 +1,18 @@ +// Copyright 2020 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . package chain import ( diff --git a/swap/chain/common_test.go b/swap/chain/common_test.go index 38d9109af3..0e6ba7d4e1 100644 --- a/swap/chain/common_test.go +++ b/swap/chain/common_test.go @@ -1,3 +1,18 @@ +// Copyright 2020 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . package chain import "github.com/ethersphere/swarm/testutil" diff --git a/swap/chain/mock/testbackend.go b/swap/chain/mock/testbackend.go index 40b64c4b46..3b267decf6 100644 --- a/swap/chain/mock/testbackend.go +++ b/swap/chain/mock/testbackend.go @@ -1,3 +1,18 @@ +// Copyright 2020 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . package mock import ( diff --git a/swap/chain/persistentqueue.go b/swap/chain/persistentqueue.go index 94a1940da6..27e1eb73cc 100644 --- a/swap/chain/persistentqueue.go +++ b/swap/chain/persistentqueue.go @@ -1,3 +1,18 @@ +// Copyright 2020 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . package chain import ( diff --git a/swap/chain/persistentqueue_test.go b/swap/chain/persistentqueue_test.go index 366c0d1a5d..eb0340fc72 100644 --- a/swap/chain/persistentqueue_test.go +++ b/swap/chain/persistentqueue_test.go @@ -1,3 +1,18 @@ +// Copyright 2020 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . package chain import ( diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go index 7fdc5f7717..d2ee330242 100644 --- a/swap/chain/txqueue.go +++ b/swap/chain/txqueue.go @@ -1,3 +1,18 @@ +// Copyright 2020 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . package chain import ( diff --git a/swap/chain/txqueue_test.go b/swap/chain/txqueue_test.go index 97d9971ea5..f0caff93df 100644 --- a/swap/chain/txqueue_test.go +++ b/swap/chain/txqueue_test.go @@ -1,3 +1,18 @@ +// Copyright 2020 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . package chain import ( diff --git a/swap/chain/txscheduler.go b/swap/chain/txscheduler.go index f2b1bdaa9e..9f3f1c5463 100644 --- a/swap/chain/txscheduler.go +++ b/swap/chain/txscheduler.go @@ -1,3 +1,18 @@ +// Copyright 2020 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . package chain import ( From 8a9ea5e9cd11bd90394633edcbe35f9cfeb16808 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Fri, 13 Mar 2020 13:10:03 +0100 Subject: [PATCH 10/18] contract/swap, swap, swap/chain: refactor gas estimation, estimate in tests --- contracts/swap/swap.go | 7 +++---- swap/cashout_test.go | 14 +++++++++++++- swap/chain/txqueue.go | 8 ++------ swap/chain/txscheduler.go | 14 ++++++++++++++ 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/contracts/swap/swap.go b/contracts/swap/swap.go index ec3fe845e0..e8d2bf7926 100644 --- a/contracts/swap/swap.go +++ b/contracts/swap/swap.go @@ -152,10 +152,9 @@ func (s simpleContract) CashChequeBeneficiaryRequest(beneficiary common.Address, } return &chain.TxRequest{ - To: s.address, - Value: big.NewInt(0), - GasLimit: 200000, - Data: callData, + To: s.address, + Value: big.NewInt(0), + Data: callData, }, nil } diff --git a/swap/cashout_test.go b/swap/cashout_test.go index 9e5b5246e3..e2d4a7a11b 100644 --- a/swap/cashout_test.go +++ b/swap/cashout_test.go @@ -58,6 +58,14 @@ func TestContractIntegration(t *testing.T) { t.Fatal(err) } + txRequest.GasLimit, err = txRequest.EstimateGas(ctx, backend, opts.From) + if err != nil { + t.Fatal(err) + } + if err != nil { + t.Fatal(err) + } + nonce, err := backend.PendingNonceAt(ctx, opts.From) if err != nil { t.Fatal(err) @@ -116,6 +124,11 @@ func TestContractIntegration(t *testing.T) { t.Fatal(err) } + txRequest.GasLimit, err = txRequest.EstimateGas(ctx, backend, opts.From) + if err != nil { + t.Fatal(err) + } + nonce, err = backend.PendingNonceAt(ctx, opts.From) if err != nil { t.Fatal(err) @@ -143,7 +156,6 @@ func TestContractIntegration(t *testing.T) { if !cashResult.Bounced { t.Fatal("cheque did not bounce") } - } // TestCashCheque creates a valid cheque and feeds it to cashoutProcessor.submitCheque diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go index d2ee330242..92aeac5216 100644 --- a/swap/chain/txqueue.go +++ b/swap/chain/txqueue.go @@ -23,7 +23,6 @@ import ( "sync" "time" - ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" @@ -507,14 +506,11 @@ func (txq *TxQueue) generateTransaction(requestMetadata *txRequestData) error { request := requestMetadata.Request if request.GasLimit == 0 { - request.GasLimit, err = txq.backend.EstimateGas(txq.ctx, ethereum.CallMsg{ - From: opts.From, - To: &request.To, - Data: request.Data, - }) + gasLimit, err := request.EstimateGas(txq.ctx, txq.backend, opts.From) if err != nil { return txq.finalizeRequestCancelled(requestMetadata, err) } + request.GasLimit = gasLimit } if request.GasPrice == nil { diff --git a/swap/chain/txscheduler.go b/swap/chain/txscheduler.go index 9f3f1c5463..ed825ff63e 100644 --- a/swap/chain/txscheduler.go +++ b/swap/chain/txscheduler.go @@ -19,6 +19,7 @@ import ( "context" "math/big" + ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -66,6 +67,19 @@ func (request *TxRequest) ToSignedTx(nonce uint64, opts *bind.TransactOpts) (*ty return opts.Signer(&types.HomesteadSigner{}, opts.From, tx) } +// EstimateGas estimates the gas usage if this request was send from the supplied sender +func (request *TxRequest) EstimateGas(ctx context.Context, backend Backend, from common.Address) (uint64, error) { + gasLimit, err := backend.EstimateGas(ctx, ethereum.CallMsg{ + From: from, + To: &request.To, + Data: request.Data, + }) + if err != nil { + return 0, err + } + return gasLimit, nil +} + // TxScheduler represents a central sender for all transactions from a single ethereum account // its purpose is to ensure there are no nonce issues and that transaction initiators are notified of the result // notifications are guaranteed to happen even across node restarts and disconnects from the ethereum backend From 1a59564c3e0534f5e8a4770dbe1861efc8d98679 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Fri, 13 Mar 2020 13:13:24 +0100 Subject: [PATCH 11/18] swap: add comment about interface --- swap/swap.go | 1 + 1 file changed, 1 insertion(+) diff --git a/swap/swap.go b/swap/swap.go index 4706488994..fa5bcbac7d 100644 --- a/swap/swap.go +++ b/swap/swap.go @@ -53,6 +53,7 @@ var ErrSkipDeposit = errors.New("swap-deposit-amount non-zero, but swap-skip-dep // a peer to peer micropayment system // A node maintains an individual balance with every peer // Only messages which have a price will be accounted for +// Swap implements the CashoutResultHandler interface type Swap struct { store state.Store // store is needed in order to keep balances and cheques across sessions peers map[enode.ID]*Peer // map of all swap Peers From c872d34f0fc53ee32deb8120e8a66aa2aee59cbd Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Fri, 13 Mar 2020 13:38:47 +0100 Subject: [PATCH 12/18] swap: log transaction errors --- swap/cashout.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/swap/cashout.go b/swap/cashout.go index d6fc715f3e..70c753dd1d 100644 --- a/swap/cashout.go +++ b/swap/cashout.go @@ -87,6 +87,14 @@ func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, p result := otherSwap.CashChequeBeneficiaryResult(receipt) return c.cashoutResultHandler.HandleCashoutResult(request, result, receipt) }, + NotifyCancelled: func(ctx context.Context, id uint64, notification *chain.TxCancelledNotification) error { + c.logger.Warn("cheque cashing transaction cancelled", "reason", notification.Reason) + return nil + }, + NotifyStatusUnknown: func(ctx context.Context, id uint64, notification *chain.TxStatusUnknownNotification) error { + c.logger.Error("cheque cashing transaction status unknown", "reason", notification.Reason) + return nil + }, }) return c } From a67d6ea912fa6e28ae2896b74337db870a1cec29 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Fri, 13 Mar 2020 13:42:08 +0100 Subject: [PATCH 13/18] swap: fix logs and log pending hash --- swap/cashout.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/swap/cashout.go b/swap/cashout.go index 70c753dd1d..bd4a1254b4 100644 --- a/swap/cashout.go +++ b/swap/cashout.go @@ -80,19 +80,23 @@ func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, p receipt := ¬ification.Receipt if receipt.Status == 0 { - c.logger.Error("cheque cashing transaction reverted", "tx", receipt.TxHash) + c.logger.Error(CashChequeAction, "cheque cashing transaction reverted", "tx", receipt.TxHash) return nil } result := otherSwap.CashChequeBeneficiaryResult(receipt) return c.cashoutResultHandler.HandleCashoutResult(request, result, receipt) }, + NotifyPending: func(ctx context.Context, id uint64, notification *chain.TxPendingNotification) error { + c.logger.Debug(CashChequeAction, "cheque cashing transaction sent", "hash", notification.Transaction.Hash()) + return nil + }, NotifyCancelled: func(ctx context.Context, id uint64, notification *chain.TxCancelledNotification) error { - c.logger.Warn("cheque cashing transaction cancelled", "reason", notification.Reason) + c.logger.Warn(CashChequeAction, "cheque cashing transaction cancelled", "reason", notification.Reason) return nil }, NotifyStatusUnknown: func(ctx context.Context, id uint64, notification *chain.TxStatusUnknownNotification) error { - c.logger.Error("cheque cashing transaction status unknown", "reason", notification.Reason) + c.logger.Error(CashChequeAction, "cheque cashing transaction status unknown", "reason", notification.Reason) return nil }, }) From 2deea572d47c90e35f662e289f7bcc5346c67661 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Fri, 13 Mar 2020 13:55:23 +0100 Subject: [PATCH 14/18] swap/chain: update comment regarding id --- swap/chain/txqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go index 92aeac5216..4080e3e869 100644 --- a/swap/chain/txqueue.go +++ b/swap/chain/txqueue.go @@ -138,7 +138,7 @@ func (txq *TxQueue) ScheduleRequest(handlerID string, request TxRequest, extraDa if err != nil && err != state.ErrNotFound { return 0, err } - // ids start at 1 + // increment existing id, starting with an initial value of 1 id++ // in a single batch this From f936138303fe31e55b967113b8a11149b8501d65 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Mon, 16 Mar 2020 17:21:09 +0100 Subject: [PATCH 15/18] swap/chain: wait for start in handlers --- swap/chain/txqueue.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go index 4080e3e869..47ed057451 100644 --- a/swap/chain/txqueue.go +++ b/swap/chain/txqueue.go @@ -34,12 +34,13 @@ import ( // A new transaction is only sent after the previous one confirmed // This is done to minimize the chance of wrong nonce use type TxQueue struct { - lock sync.Mutex // lock for the entire queue - ctx context.Context // context used for all network requests and waiting operations to ensure the queue can be stopped at any point - cancel context.CancelFunc // function to cancel the above context - wg sync.WaitGroup // used to ensure that all background go routines have finished before Stop returns - started bool // bool indicating that the queue has been started. used to ensure it does not run multiple times simultaneously - errorChan chan error // channel to stop the queue in case of errors + lock sync.Mutex // lock for the entire queue + ctx context.Context // context used for all network requests and waiting operations to ensure the queue can be stopped at any point + cancel context.CancelFunc // function to cancel the above context + wg sync.WaitGroup // used to ensure that all background go routines have finished before Stop returns + startedChan chan struct{} // channel to be closed when the queue has started processing + started bool // bool indicating that the queue has been started. used to ensure it does not run multiple times simultaneously + errorChan chan error // channel to stop the queue in case of errors store state.Store // state store to use as the db backend prefix string // all keys in the state store are prefixed with this @@ -86,6 +87,7 @@ func NewTxQueue(store state.Store, prefix string, backend TxSchedulerBackend, pr privateKey: privateKey, requestQueue: newPersistentQueue(store, prefix+"_requestQueue_"), errorChan: make(chan error, 1), + startedChan: make(chan struct{}, 0), } // we create the context here already because handlers can be set before the queue starts txq.ctx, txq.cancel = context.WithCancel(context.Background()) @@ -226,6 +228,8 @@ func (txq *TxQueue) Start() { case <-txq.ctx.Done(): } }() + + close(txq.startedChan) } // Stop stops processing transactions if it is running @@ -274,6 +278,13 @@ func (txq *TxQueue) SetHandlers(handlerID string, handlers *TxRequestHandlers) e go func() { defer txq.wg.Done() + // only start sending notification once the loop started + select { + case <-txq.startedChan: + case <-txq.ctx.Done(): + return + } + for { var item notificationQueueItem // get the next notification item From d9e54baca4e107213960b7fe3a23f118701fa74a Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Tue, 17 Mar 2020 11:09:57 +0100 Subject: [PATCH 16/18] swap/chain: fix linting issue --- swap/chain/txqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go index 47ed057451..dae454c746 100644 --- a/swap/chain/txqueue.go +++ b/swap/chain/txqueue.go @@ -87,7 +87,7 @@ func NewTxQueue(store state.Store, prefix string, backend TxSchedulerBackend, pr privateKey: privateKey, requestQueue: newPersistentQueue(store, prefix+"_requestQueue_"), errorChan: make(chan error, 1), - startedChan: make(chan struct{}, 0), + startedChan: make(chan struct{}), } // we create the context here already because handlers can be set before the queue starts txq.ctx, txq.cancel = context.WithCancel(context.Background()) From 40327f905fb80db4d839325c1a676138d693bc28 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Wed, 18 Mar 2020 12:32:19 +0100 Subject: [PATCH 17/18] swap/chain: add line after copyright header --- swap/chain/backend.go | 1 + swap/chain/common_test.go | 1 + swap/chain/mock/testbackend.go | 1 + swap/chain/persistentqueue.go | 1 + swap/chain/persistentqueue_test.go | 1 + swap/chain/txqueue.go | 1 + swap/chain/txqueue_test.go | 1 + swap/chain/txscheduler.go | 1 + 8 files changed, 8 insertions(+) diff --git a/swap/chain/backend.go b/swap/chain/backend.go index e78ac82ccb..7091f4c204 100644 --- a/swap/chain/backend.go +++ b/swap/chain/backend.go @@ -13,6 +13,7 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the Swarm library. If not, see . + package chain import ( diff --git a/swap/chain/common_test.go b/swap/chain/common_test.go index 0e6ba7d4e1..06d954b9da 100644 --- a/swap/chain/common_test.go +++ b/swap/chain/common_test.go @@ -13,6 +13,7 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the Swarm library. If not, see . + package chain import "github.com/ethersphere/swarm/testutil" diff --git a/swap/chain/mock/testbackend.go b/swap/chain/mock/testbackend.go index 3b267decf6..0f8c88e091 100644 --- a/swap/chain/mock/testbackend.go +++ b/swap/chain/mock/testbackend.go @@ -13,6 +13,7 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the Swarm library. If not, see . + package mock import ( diff --git a/swap/chain/persistentqueue.go b/swap/chain/persistentqueue.go index 27e1eb73cc..bac966cb9b 100644 --- a/swap/chain/persistentqueue.go +++ b/swap/chain/persistentqueue.go @@ -13,6 +13,7 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the Swarm library. If not, see . + package chain import ( diff --git a/swap/chain/persistentqueue_test.go b/swap/chain/persistentqueue_test.go index eb0340fc72..4c9aedc65b 100644 --- a/swap/chain/persistentqueue_test.go +++ b/swap/chain/persistentqueue_test.go @@ -13,6 +13,7 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the Swarm library. If not, see . + package chain import ( diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go index dae454c746..028d0a8603 100644 --- a/swap/chain/txqueue.go +++ b/swap/chain/txqueue.go @@ -13,6 +13,7 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the Swarm library. If not, see . + package chain import ( diff --git a/swap/chain/txqueue_test.go b/swap/chain/txqueue_test.go index f0caff93df..c50789bdcd 100644 --- a/swap/chain/txqueue_test.go +++ b/swap/chain/txqueue_test.go @@ -13,6 +13,7 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the Swarm library. If not, see . + package chain import ( diff --git a/swap/chain/txscheduler.go b/swap/chain/txscheduler.go index ed825ff63e..3fa1e4d60e 100644 --- a/swap/chain/txscheduler.go +++ b/swap/chain/txscheduler.go @@ -13,6 +13,7 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the Swarm library. If not, see . + package chain import ( From be6d28cf2c407d8c6d3277afdd9955bc40f7e235 Mon Sep 17 00:00:00 2001 From: Ralph Pichler Date: Fri, 27 Mar 2020 10:04:15 +0100 Subject: [PATCH 18/18] swap/chain: add lock to persistentqueue test --- swap/chain/persistentqueue_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/swap/chain/persistentqueue_test.go b/swap/chain/persistentqueue_test.go index 4c9aedc65b..426b4dfa5c 100644 --- a/swap/chain/persistentqueue_test.go +++ b/swap/chain/persistentqueue_test.go @@ -42,7 +42,9 @@ func TestNewPersistentQueue(t *testing.T) { count := 200 + var errlock sync.Mutex var errout error // stores the last error that occurred in one of the routines + go func() { defer wg.Done() for i := 0; i < count; i++ { @@ -50,18 +52,24 @@ func TestNewPersistentQueue(t *testing.T) { var value uint64 key, err := queue.next(ctx, &value, &lock) if err != nil { + errlock.Lock() errout = fmt.Errorf("failed to get next item: %v", err) + errlock.Unlock() return } defer lock.Unlock() if key == "" { + errlock.Lock() errout = errors.New("key is empty") + errlock.Unlock() return } if value != uint64(i) { + errlock.Lock() errout = fmt.Errorf("values don't match: got %v, expected %v", value, i) + errlock.Unlock() return } @@ -69,7 +77,9 @@ func TestNewPersistentQueue(t *testing.T) { queue.delete(batch, key) err = store.WriteBatch(batch) if err != nil { + errlock.Lock() errout = fmt.Errorf("could not write batch: %v", err) + errlock.Unlock() return } }() @@ -87,12 +97,16 @@ func TestNewPersistentQueue(t *testing.T) { batch := new(state.StoreBatch) _, trigger, err := queue.enqueue(batch, value) if err != nil { + errlock.Lock() errout = fmt.Errorf("failed to queue item: %v", err) + errlock.Unlock() return } err = store.WriteBatch(batch) if err != nil { + errlock.Lock() errout = fmt.Errorf("failed to write batch: %v", err) + errlock.Unlock() return }