Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
swap, swap/chain: add transaction queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ralph-pichler committed Feb 11, 2020
1 parent 234b7d7 commit eeb2989
Show file tree
Hide file tree
Showing 12 changed files with 1,170 additions and 175 deletions.
145 changes: 90 additions & 55 deletions swap/cashout.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ import (
// CashChequeBeneficiaryTransactionCost is the expected gas cost of a CashChequeBeneficiary transaction
const CashChequeBeneficiaryTransactionCost = 50000

// CashoutProcessor holds all relevant fields needed for processing cashouts
type CashoutProcessor struct {
backend chain.Backend // ethereum backend to use
privateKey *ecdsa.PrivateKey // private key to use
var CashoutRequestTypeID = chain.TransactionRequestTypeID{
Handler: "cashout",
RequestType: "CashoutRequest",
}

// CashoutRequest represents a request for a cashout operation
Expand All @@ -42,42 +41,106 @@ type CashoutRequest struct {
Destination common.Address // destination for the payout
}

// ActiveCashout stores the necessary information for a cashout in progess
type ActiveCashout struct {
Request CashoutRequest // the request that caused this cashout
TransactionHash common.Hash // the hash of the current transaction for this request
// CashoutProcessor holds all relevant fields needed for processing cashouts
type CashoutProcessor struct {
backend chain.Backend // ethereum backend to use
txScheduler chain.TxScheduler // transaction queue to use
cashoutDone chan *CashoutRequest
}

// newCashoutProcessor creates a new instance of CashoutProcessor
func newCashoutProcessor(backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor {
return &CashoutProcessor{
backend: backend,
privateKey: privateKey,
func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor {
c := &CashoutProcessor{
backend: backend,
txScheduler: txScheduler,
}

txScheduler.SetHandlers(CashoutRequestTypeID, &chain.TransactionRequestHandlers{
Send: func(id uint64, backend chain.Backend, opts *bind.TransactOpts) (common.Hash, error) {
var request CashoutRequest
if err := c.txScheduler.GetRequest(id, &request); err != nil {
return common.Hash{}, err
}

cheque := request.Cheque

otherSwap, err := contract.InstanceAt(cheque.Contract, backend)
if err != nil {
return common.Hash{}, err
}

tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
if err != nil {
return common.Hash{}, err
}
return tx.Hash(), nil
},
NotifyReceipt: func(id uint64, notification *chain.TransactionReceiptNotification) error {
var request *CashoutRequest
err := c.txScheduler.GetRequest(id, &request)
if err != nil {
return err
}

otherSwap, err := contract.InstanceAt(request.Cheque.Contract, c.backend)
if err != nil {
return err
}

receipt := &notification.Receipt
if receipt.Status == 0 {
swapLog.Error("cheque cashing transaction reverted", "tx", receipt.TxHash)
return nil
}

result := otherSwap.CashChequeBeneficiaryResult(receipt)

metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64())

if result.Bounced {
metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1)
swapLog.Warn("cheque bounced", "tx", receipt.TxHash)
}

swapLog.Info("cheque cashed", "honey", request.Cheque.Honey)

select {
case c.cashoutDone <- request:
default:
}

return nil
},
})
return c
}

// cashCheque tries to cash the cheque specified in the request
// after the transaction is sent it waits on its success
func (c *CashoutProcessor) cashCheque(ctx context.Context, request *CashoutRequest) error {
cheque := request.Cheque
opts := bind.NewKeyedTransactor(c.privateKey)
opts.Context = ctx
func (c *CashoutProcessor) setCashoutDoneChan(cashoutDone chan *CashoutRequest) {
c.cashoutDone = cashoutDone
}

otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend)
func (c *CashoutProcessor) submitCheque(request *CashoutRequest) {
expectedPayout, transactionCosts, err := c.estimatePayout(context.TODO(), &request.Cheque)
if err != nil {
return err
swapLog.Error("could not estimate payout", "error", err)
return
}

tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
costsMultiplier := uint256.FromUint64(2)
costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier)
if err != nil {
return err
swapLog.Error("overflow in transaction fee", "error", err)
return
}

// this blocks until the cashout has been successfully processed
return c.waitForAndProcessActiveCashout(&ActiveCashout{
Request: *request,
TransactionHash: tx.Hash(),
})
// do a payout transaction if we get 2 times the gas costs
if expectedPayout.Cmp(costThreshold) == 1 {
_, err := c.txScheduler.ScheduleRequest(CashoutRequestTypeID, request)
if err != nil {
metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1)
swapLog.Error("cashing cheque:", "error", err)
}
}
}

// estimatePayout estimates the payout for a given cheque as well as the transaction cost
Expand Down Expand Up @@ -123,31 +186,3 @@ func (c *CashoutProcessor) estimatePayout(ctx context.Context, cheque *Cheque) (

return expectedPayout, transactionCosts, nil
}

// waitForAndProcessActiveCashout waits for activeCashout to complete
func (c *CashoutProcessor) waitForAndProcessActiveCashout(activeCashout *ActiveCashout) error {
ctx, cancel := context.WithTimeout(context.Background(), DefaultTransactionTimeout)
defer cancel()

receipt, err := chain.WaitMined(ctx, c.backend, activeCashout.TransactionHash)
if err != nil {
return err
}

otherSwap, err := contract.InstanceAt(activeCashout.Request.Cheque.Contract, c.backend)
if err != nil {
return err
}

result := otherSwap.CashChequeBeneficiaryResult(receipt)

metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64())

if result.Bounced {
metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1)
swapLog.Warn("cheque bounced", "tx", receipt.TxHash)
}

swapLog.Info("cheque cashed", "honey", activeCashout.Request.Cheque.Honey)
return nil
}
41 changes: 28 additions & 13 deletions swap/cashout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package swap
import (
"context"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/log"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/swap/chain"
"github.com/ethersphere/swarm/uint256"
)
Expand All @@ -33,8 +35,7 @@ import (
// afterwards it attempts to cash-in a bouncing cheque
func TestContractIntegration(t *testing.T) {
backend := newTestBackend(t)
reset := setupContractTest()
defer reset()
defer backend.Close()

payout := uint256.FromUint64(42)
chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
Expand Down Expand Up @@ -116,11 +117,15 @@ func TestContractIntegration(t *testing.T) {
// TestCashCheque creates a valid cheque and feeds it to cashoutProcessor.cashCheque
func TestCashCheque(t *testing.T) {
backend := newTestBackend(t)
reset := setupContractTest()
defer reset()
defer backend.Close()

cashoutProcessor := newCashoutProcessor(backend, ownerKey)
payout := uint256.FromUint64(42)
store := state.NewInmemoryStore()
defer store.Close()
transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey)
transactionQueue.Start()
defer transactionQueue.Stop()
cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey)
payout := uint256.FromUint64(CashChequeBeneficiaryTransactionCost*2 + 1)

chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
if err != nil {
Expand All @@ -132,12 +137,18 @@ func TestCashCheque(t *testing.T) {
t.Fatal(err)
}

err = cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{
cashChequeDone := make(chan *CashoutRequest)
defer close(cashChequeDone)
cashoutProcessor.setCashoutDoneChan(cashChequeDone)

cashoutProcessor.submitCheque(&CashoutRequest{
Cheque: *testCheque,
Destination: ownerAddress,
})
if err != nil {
t.Fatal(err)

select {
case <-cashChequeDone:
case <-time.After(5 * time.Second):
}

paidOut, err := chequebook.PaidOut(nil, ownerAddress)
Expand All @@ -154,10 +165,14 @@ func TestCashCheque(t *testing.T) {
// TestEstimatePayout creates a valid cheque and feeds it to cashoutProcessor.estimatePayout
func TestEstimatePayout(t *testing.T) {
backend := newTestBackend(t)
reset := setupContractTest()
defer reset()

cashoutProcessor := newCashoutProcessor(backend, ownerKey)
defer backend.Close()

store := state.NewInmemoryStore()
defer store.Close()
transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey)
transactionQueue.Start()
defer transactionQueue.Stop()
cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey)
payout := uint256.FromUint64(42)

chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
Expand Down
9 changes: 0 additions & 9 deletions swap/chain/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package chain

import (
"context"
"errors"
"time"

"github.com/ethereum/go-ethereum/log"
Expand All @@ -12,11 +11,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)

var (
// ErrTransactionReverted is given when the transaction that cashes a cheque is reverted
ErrTransactionReverted = errors.New("Transaction reverted")
)

// Backend is the minimum amount of functionality required by the underlying ethereum backend
type Backend interface {
bind.ContractBackend
Expand All @@ -36,9 +30,6 @@ func WaitMined(ctx context.Context, b Backend, hash common.Hash) (*types.Receipt
log.Trace("Receipt retrieval failed", "err", err)
}
if receipt != nil {
if receipt.Status != types.ReceiptStatusSuccessful {
return nil, ErrTransactionReverted
}
return receipt, nil
}

Expand Down
105 changes: 105 additions & 0 deletions swap/chain/persistentqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package chain

import (
"context"
"encoding"
"encoding/json"
"strconv"
"strings"
"sync"
"time"

"github.com/ethersphere/swarm/state"
)

// PersistentQueue represents a queue stored in a state store
type PersistentQueue struct {
lock sync.Mutex
store state.Store
prefix string
trigger chan struct{}
nonce uint64
}

// NewPersistentQueue creates a structure to interact with a queue with the given prefix
func NewPersistentQueue(store state.Store, prefix string) *PersistentQueue {
return &PersistentQueue{
store: store,
prefix: prefix,
trigger: make(chan struct{}, 1),
nonce: 0,
}
}

// Queue puts the necessary database operations for the queueing into the supplied batch
// it returns the generated key and a trigger function which must be called if the batch was successfully written
// this only returns an error if the encoding fails which is an unrecoverable error
func (pq *PersistentQueue) Queue(b *state.StoreBatch, v interface{}) (key string, trigger func(), err error) {
pq.lock.Lock()
defer pq.lock.Unlock()

pq.nonce++
key = time.Now().String() + "_" + strconv.FormatUint(pq.nonce, 10)
if err = b.Put(pq.prefix+key, v); err != nil {
return "", nil, err
}

return key, func() {
select {
case pq.trigger <- struct{}{}:
default:
}
}, nil
}

// Peek looks at the next item in the queue
// the error returned is either an decode or an io error
func (pq *PersistentQueue) Peek(i interface{}) (key string, exists bool, err error) {
err = pq.store.Iterate(pq.prefix, func(k, data []byte) (bool, error) {
key = string(k)
unmarshaler, ok := i.(encoding.BinaryUnmarshaler)
if !ok {
return true, json.Unmarshal(data, i)
}
return true, unmarshaler.UnmarshalBinary(data)
})
if err != nil {
return "", false, err
}
if key == "" {
return "", false, nil
}
return strings.TrimPrefix(key, pq.prefix), true, nil
}

// Next looks at the next item in the queue and blocks until an item is available if there is none
// the error returned is either an decode error, an io error or a cancelled context
func (pq *PersistentQueue) Next(ctx context.Context, i interface{}) (key string, err error) {
key, exists, err := pq.Peek(i)
if err != nil {
return "", err
}
if exists {
return key, nil
}

for {
select {
case <-pq.trigger:
key, exists, err = pq.Peek(i)
if err != nil {
return "", err
}
if exists {
return key, nil
}
case <-ctx.Done():
return "", ctx.Err()
}
}
}

// Delete adds the batch operation to delete the queue element with the given key
func (pq *PersistentQueue) Delete(b *state.StoreBatch, key string) {
b.Delete(pq.prefix + key)
}
Loading

0 comments on commit eeb2989

Please sign in to comment.