Skip to content

Commit

Permalink
client/{mm, core}: Simple Arbitrage
Browse files Browse the repository at this point in the history
This implements the simple arbitrage opportunity which only places orders
when there is an arbitrage opportunity.

- A `libxc` package is added which contains a `CEX` interface used to
  interact with a centralized exchange's API. It is implemented for
  Binance.
- The new strategy is implemented in `mm_simple_arb.go` and can be run by
  creating a `BotConfig` with a non-nil `ArbCfg`.
- A testbinance command line tool is added which starts a webserver that
  responds to the requests that the Binance testnet does not support.
- A `VWAP` function is added to the client orderbook.
- `client/comms/WSConn` is updated with a `SendRaw` function which sends
  arbitrary byte slices over the websocket connection.
  • Loading branch information
martonp committed Aug 16, 2023
1 parent 5977a99 commit 80b6d42
Show file tree
Hide file tree
Showing 19 changed files with 3,802 additions and 128 deletions.
150 changes: 150 additions & 0 deletions client/cmd/testbinance/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package main

/*
* Starts an http server that responds with a hardcoded result to the binance API's
* "/sapi/v1/capital/config/getall" endpoint. Binance's testnet does not support the
* "sapi" endpoints, and this is the only "sapi" endpoint that we use.
*/

import (
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"

"decred.org/dcrdex/client/websocket"
"decred.org/dcrdex/dex"
)

const (
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
)

var (
log = dex.StdOutLogger("TBNC", dex.LevelDebug)
)

func main() {
if err := mainErr(); err != nil {
fmt.Fprint(os.Stderr, err, "\n")
os.Exit(1)
}
os.Exit(0)
}

func mainErr() error {
f := &fakeBinance{
wsServer: websocket.New(nil, log.SubLogger("WS")),
balances: map[string]*balance{
"eth": {
free: 1000.123432,
locked: 0,
},
"btc": {
free: 1000.21314123,
locked: 0,
},
"ltc": {
free: 1000.8689444,
locked: 0,
},
"bch": {
free: 1000.2358249,
locked: 0,
},
"dcr": {
free: 1000.2358249,
locked: 0,
},
},
}
http.HandleFunc("/sapi/v1/capital/config/getall", f.handleWalletCoinsReq)

return http.ListenAndServe(":37346", nil)
}

type balance struct {
free float64
locked float64
}

type fakeBinance struct {
wsServer *websocket.Server

balanceMtx sync.RWMutex
balances map[string]*balance
}

func (f *fakeBinance) handleWalletCoinsReq(w http.ResponseWriter, r *http.Request) {
ci := f.coinInfo()
writeJSONWithStatus(w, ci, http.StatusOK)
}

type fakeBinanceNetworkInfo struct {
Coin string `json:"coin"`
MinConfirm int `json:"minConfirm"`
Network string `json:"network"`
UnLockConfirm int `json:"unLockConfirm"`
WithdrawEnable bool `json:"withdrawEnable"`
WithdrawFee string `json:"withdrawFee"`
WithdrawIntegerMultiple string `json:"withdrawIntegerMultiple"`
WithdrawMax string `json:"withdrawMax"`
WithdrawMin string `json:"withdrawMin"`
}

type fakeBinanceCoinInfo struct {
Coin string `json:"coin"`
Free string `json:"free"`
Locked string `json:"locked"`
Withdrawing string `json:"withdrawing"`
NetworkList []*fakeBinanceNetworkInfo `json:"networkList"`
}

func (f *fakeBinance) coinInfo() (coins []*fakeBinanceCoinInfo) {
f.balanceMtx.Lock()
for symbol, bal := range f.balances {
bigSymbol := strings.ToUpper(symbol)
coins = append(coins, &fakeBinanceCoinInfo{
Coin: bigSymbol,
Free: strconv.FormatFloat(bal.free, 'f', 8, 64),
Locked: strconv.FormatFloat(bal.locked, 'f', 8, 64),
Withdrawing: "0",
NetworkList: []*fakeBinanceNetworkInfo{
{
Coin: bigSymbol,
Network: bigSymbol,
MinConfirm: 1,
WithdrawEnable: true,
WithdrawFee: strconv.FormatFloat(0.00000800, 'f', 8, 64),
WithdrawIntegerMultiple: strconv.FormatFloat(0.00000001, 'f', 8, 64),
WithdrawMax: strconv.FormatFloat(1000, 'f', 8, 64),
WithdrawMin: strconv.FormatFloat(0.01, 'f', 8, 64),
},
},
})
}
f.balanceMtx.Unlock()
return
}

// writeJSON writes marshals the provided interface and writes the bytes to the
// ResponseWriter with the specified response code.
func writeJSONWithStatus(w http.ResponseWriter, thing interface{}, code int) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
b, err := json.Marshal(thing)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Errorf("JSON encode error: %v", err)
return
}
w.WriteHeader(code)
_, err = w.Write(append(b, byte('\n')))
if err != nil {
log.Errorf("Write error: %v", err)
}
}
142 changes: 83 additions & 59 deletions client/comms/wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type WsConn interface {
NextID() uint64
IsDown() bool
Send(msg *msgjson.Message) error
SendRaw(b []byte) error
Request(msg *msgjson.Message, respHandler func(*msgjson.Message)) error
RequestWithTimeout(msg *msgjson.Message, respHandler func(*msgjson.Message), expireTime time.Duration, expire func()) error
Connect(ctx context.Context) (*sync.WaitGroup, error)
Expand Down Expand Up @@ -134,6 +135,12 @@ type WsCfg struct {

// NetDialContext specifies an optional dialer context to use.
NetDialContext func(context.Context, string, string) (net.Conn, error)

// RawHandler overrides the msgjson parsing and forwards all messages to
// the provided function.
RawHandler func([]byte)

ConnectHeaders http.Header
}

// wsConn represents a client websocket connection.
Expand Down Expand Up @@ -225,7 +232,8 @@ func (conn *wsConn) connect(ctx context.Context) error {
} else {
dialer.Proxy = http.ProxyFromEnvironment
}
ws, _, err := dialer.DialContext(ctx, conn.cfg.URL, nil)

ws, _, err := dialer.DialContext(ctx, conn.cfg.URL, conn.cfg.ConnectHeaders)
if err != nil {
if isErrorInvalidCert(err) {
conn.setConnectionStatus(InvalidCert)
Expand Down Expand Up @@ -280,7 +288,11 @@ func (conn *wsConn) connect(ctx context.Context) error {
conn.wg.Add(1)
go func() {
defer conn.wg.Done()
conn.read(ctx)
if conn.cfg.RawHandler != nil {
conn.readRaw(ctx)
} else {
conn.read(ctx)
}
}()

return nil
Expand All @@ -295,14 +307,70 @@ func (conn *wsConn) close() {
conn.ws.Close()
}

// read fetches and parses incoming messages for processing. This should be
// run as a goroutine. Increment the wg before calling read.
func (conn *wsConn) read(ctx context.Context) {
func (conn *wsConn) handleReadError(err error) {
reconnect := func() {
conn.setConnectionStatus(Disconnected)
conn.reconnectCh <- struct{}{}
}

var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
conn.log.Errorf("Read timeout on connection to %s.", conn.cfg.URL)
reconnect()
return
}
// TODO: Now that wsConn goroutines have contexts that are canceled
// on shutdown, we do not have to infer the source and severity of
// the error; just reconnect in ALL other cases, and remove the
// following legacy checks.

// Expected close errors (1000 and 1001) ... but if the server
// closes we still want to reconnect. (???)
if websocket.IsCloseError(err, websocket.CloseGoingAway,
websocket.CloseNormalClosure) ||
strings.Contains(err.Error(), "websocket: close sent") {
reconnect()
return
}

var opErr *net.OpError
if errors.As(err, &opErr) && opErr.Op == "read" {
if strings.Contains(opErr.Err.Error(), "use of closed network connection") {
conn.log.Errorf("read quitting: %v", err)
reconnect()
return
}
}

// Log all other errors and trigger a reconnection.
conn.log.Errorf("read error (%v), attempting reconnection", err)
reconnect()
}

func (conn *wsConn) readRaw(ctx context.Context) {
for {
if ctx.Err() != nil {
return
}

// Lock since conn.ws may be set by connect.
conn.wsMtx.Lock()
ws := conn.ws
conn.wsMtx.Unlock()

// Block until a message is received or an error occurs.
_, msgBytes, err := ws.ReadMessage()
if err != nil {
conn.handleReadError(err)
return
}
conn.cfg.RawHandler(msgBytes)
}
}

// read fetches and parses incoming messages for processing. This should be
// run as a goroutine. Increment the wg before calling read.
func (conn *wsConn) read(ctx context.Context) {
for {
msg := new(msgjson.Message)

Expand All @@ -319,57 +387,23 @@ func (conn *wsConn) read(ctx context.Context) {
return
}
if err != nil {
// Read timeout should flag the connection as down asap.
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
conn.log.Errorf("Read timeout on connection to %s.", conn.cfg.URL)
reconnect()
return
}

var mErr *json.UnmarshalTypeError
if errors.As(err, &mErr) {
// JSON decode errors are not fatal, log and proceed.
conn.log.Errorf("json decode error: %v", mErr)
continue
}

// TODO: Now that wsConn goroutines have contexts that are canceled
// on shutdown, we do not have to infer the source and severity of
// the error; just reconnect in ALL other cases, and remove the
// following legacy checks.

// Expected close errors (1000 and 1001) ... but if the server
// closes we still want to reconnect. (???)
if websocket.IsCloseError(err, websocket.CloseGoingAway,
websocket.CloseNormalClosure) ||
strings.Contains(err.Error(), "websocket: close sent") {
reconnect()
return
}

var opErr *net.OpError
if errors.As(err, &opErr) && opErr.Op == "read" {
if strings.Contains(opErr.Err.Error(),
"use of closed network connection") {
conn.log.Errorf("read quitting: %v", err)
reconnect()
return
}
}

// Log all other errors and trigger a reconnection.
conn.log.Errorf("read error (%v), attempting reconnection", err)
reconnect()
// Successful reconnect via connect() will start read() again.
conn.handleReadError(err)
return
}

// If the message is a response, find the handler.
if msg.Type == msgjson.Response {
handler := conn.respHandler(msg.ID)
if handler == nil {
conn.log.Errorf("unhandled response with error msg: %v", handleUnknownResponse(msg))
b, _ := json.Marshal(msg)
conn.log.Errorf("No handler found for response: %v", string(b))
continue
}
// Run handlers in a goroutine so that other messages can be
Expand Down Expand Up @@ -498,12 +532,6 @@ func (conn *wsConn) Stop() {
conn.cancel()
}

func (conn *wsConn) SendRaw(b []byte) error {
conn.wsMtx.Lock()
defer conn.wsMtx.Unlock()
return conn.ws.WriteMessage(websocket.TextMessage, b)
}

// Send pushes outgoing messages over the websocket connection. Sending of the
// message is synchronous, so a nil error guarantees that the message was
// successfully sent. A non-nil error may indicate that the connection is known
Expand All @@ -522,9 +550,14 @@ func (conn *wsConn) Send(msg *msgjson.Message) error {
return err
}

return conn.SendRaw(b)
}

// SendRaw sends a raw byte string over the websocket connection.
func (conn *wsConn) SendRaw(b []byte) error {
conn.wsMtx.Lock()
defer conn.wsMtx.Unlock()
err = conn.ws.SetWriteDeadline(time.Now().Add(writeWait))
err := conn.ws.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
conn.log.Errorf("Send: failed to set write deadline: %v", err)
return err
Expand Down Expand Up @@ -562,6 +595,7 @@ func (conn *wsConn) Request(msg *msgjson.Message, f func(*msgjson.Message)) erro
// For example, to wait on a response or timeout:
//
// errChan := make(chan error, 1)
//
// err := conn.RequestWithTimeout(reqMsg, func(msg *msgjson.Message) {
// errChan <- msg.UnmarshalResult(responseStructPointer)
// }, timeout, func() {
Expand Down Expand Up @@ -638,13 +672,3 @@ func (conn *wsConn) respHandler(id uint64) *responseHandler {
func (conn *wsConn) MessageSource() <-chan *msgjson.Message {
return conn.readCh
}

// handleUnknownResponse extracts the error message sent for a response without
// a handler.
func handleUnknownResponse(msg *msgjson.Message) error {
resp, err := msg.Response()
if err != nil {
return err
}
return resp.Error
}
3 changes: 3 additions & 0 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ func (conn *TWebsocket) Connect(context.Context) (*sync.WaitGroup, error) {
// Consider reworking the tests (TODO).
return &sync.WaitGroup{}, conn.connectErr
}
func (conn *TWebsocket) SendRaw(b []byte) error {
return nil
}

type TDB struct {
updateWalletErr error
Expand Down
1 change: 0 additions & 1 deletion client/core/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,6 @@ func (t *trackedTrade) coreOrderInternal() *Order {
counterConfs, int64(t.metaData.ToSwapConf),
int64(mt.redemptionConfs), int64(mt.redemptionConfsReq)))
}

corder.AllFeesConfirmed = allFeesConfirmed

return corder
Expand Down
Loading

0 comments on commit 80b6d42

Please sign in to comment.