diff --git a/client/cmd/testbinance/main.go b/client/cmd/testbinance/main.go new file mode 100644 index 0000000000..046844bd5b --- /dev/null +++ b/client/cmd/testbinance/main.go @@ -0,0 +1,150 @@ +package main + +/* + * Starts an http server that responds with a hardcoded result to the binance API's + * "/sapi/v1/capital/config/getall" endpoint. Binance's testnet does not support the + * "sapi" endpoints, and this is the only "sapi" endpoint that we use. + */ + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "strconv" + "strings" + "sync" + "time" + + "decred.org/dcrdex/client/websocket" + "decred.org/dcrdex/dex" +) + +const ( + pongWait = 60 * time.Second + pingPeriod = (pongWait * 9) / 10 +) + +var ( + log = dex.StdOutLogger("TBNC", dex.LevelDebug) +) + +func main() { + if err := mainErr(); err != nil { + fmt.Fprint(os.Stderr, err, "\n") + os.Exit(1) + } + os.Exit(0) +} + +func mainErr() error { + f := &fakeBinance{ + wsServer: websocket.New(nil, log.SubLogger("WS")), + balances: map[string]*balance{ + "eth": { + free: 1000.123432, + locked: 0, + }, + "btc": { + free: 1000.21314123, + locked: 0, + }, + "ltc": { + free: 1000.8689444, + locked: 0, + }, + "bch": { + free: 1000.2358249, + locked: 0, + }, + "dcr": { + free: 1000.2358249, + locked: 0, + }, + }, + } + http.HandleFunc("/sapi/v1/capital/config/getall", f.handleWalletCoinsReq) + + return http.ListenAndServe(":37346", nil) +} + +type balance struct { + free float64 + locked float64 +} + +type fakeBinance struct { + wsServer *websocket.Server + + balanceMtx sync.RWMutex + balances map[string]*balance +} + +func (f *fakeBinance) handleWalletCoinsReq(w http.ResponseWriter, r *http.Request) { + ci := f.coinInfo() + writeJSONWithStatus(w, ci, http.StatusOK) +} + +type fakeBinanceNetworkInfo struct { + Coin string `json:"coin"` + MinConfirm int `json:"minConfirm"` + Network string `json:"network"` + UnLockConfirm int `json:"unLockConfirm"` + WithdrawEnable bool `json:"withdrawEnable"` + WithdrawFee string `json:"withdrawFee"` + WithdrawIntegerMultiple string `json:"withdrawIntegerMultiple"` + WithdrawMax string `json:"withdrawMax"` + WithdrawMin string `json:"withdrawMin"` +} + +type fakeBinanceCoinInfo struct { + Coin string `json:"coin"` + Free string `json:"free"` + Locked string `json:"locked"` + Withdrawing string `json:"withdrawing"` + NetworkList []*fakeBinanceNetworkInfo `json:"networkList"` +} + +func (f *fakeBinance) coinInfo() (coins []*fakeBinanceCoinInfo) { + f.balanceMtx.Lock() + for symbol, bal := range f.balances { + bigSymbol := strings.ToUpper(symbol) + coins = append(coins, &fakeBinanceCoinInfo{ + Coin: bigSymbol, + Free: strconv.FormatFloat(bal.free, 'f', 8, 64), + Locked: strconv.FormatFloat(bal.locked, 'f', 8, 64), + Withdrawing: "0", + NetworkList: []*fakeBinanceNetworkInfo{ + { + Coin: bigSymbol, + Network: bigSymbol, + MinConfirm: 1, + WithdrawEnable: true, + WithdrawFee: strconv.FormatFloat(0.00000800, 'f', 8, 64), + WithdrawIntegerMultiple: strconv.FormatFloat(0.00000001, 'f', 8, 64), + WithdrawMax: strconv.FormatFloat(1000, 'f', 8, 64), + WithdrawMin: strconv.FormatFloat(0.01, 'f', 8, 64), + }, + }, + }) + } + f.balanceMtx.Unlock() + return +} + +// writeJSON marshals the provided interface and writes the bytes to the +// ResponseWriter with the specified response code. +func writeJSONWithStatus(w http.ResponseWriter, thing interface{}, code int) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + b, err := json.Marshal(thing) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Errorf("JSON encode error: %v", err) + return + } + w.WriteHeader(code) + _, err = w.Write(append(b, byte('\n'))) + if err != nil { + log.Errorf("Write error: %v", err) + } +} diff --git a/client/comms/wsconn.go b/client/comms/wsconn.go index b3e3f495c7..d9da03efe1 100644 --- a/client/comms/wsconn.go +++ b/client/comms/wsconn.go @@ -139,6 +139,8 @@ type WsCfg struct { // RawHandler overrides the msgjson parsing and forwards all messages to // the provided function. RawHandler func([]byte) + + ConnectHeaders http.Header } // wsConn represents a client websocket connection. @@ -231,7 +233,7 @@ func (conn *wsConn) connect(ctx context.Context) error { dialer.Proxy = http.ProxyFromEnvironment } - ws, _, err := dialer.DialContext(ctx, conn.cfg.URL, nil) + ws, _, err := dialer.DialContext(ctx, conn.cfg.URL, conn.cfg.ConnectHeaders) if err != nil { if isErrorInvalidCert(err) { conn.setConnectionStatus(InvalidCert) @@ -291,7 +293,6 @@ func (conn *wsConn) connect(ctx context.Context) error { } else { conn.read(ctx) } - }() return nil @@ -357,10 +358,6 @@ func (conn *wsConn) close() { func (conn *wsConn) readRaw(ctx context.Context) { for { - if ctx.Err() != nil { - return - } - // Lock since conn.ws may be set by connect. conn.wsMtx.Lock() ws := conn.ws @@ -368,6 +365,10 @@ func (conn *wsConn) readRaw(ctx context.Context) { // Block until a message is received or an error occurs. _, msgBytes, err := ws.ReadMessage() + // Drop the read error on context cancellation. + if ctx.Err() != nil { + return + } if err != nil { conn.handleReadError(err) return @@ -601,6 +602,7 @@ func (conn *wsConn) Request(msg *msgjson.Message, f func(*msgjson.Message)) erro // For example, to wait on a response or timeout: // // errChan := make(chan error, 1) +// // err := conn.RequestWithTimeout(reqMsg, func(msg *msgjson.Message) { // errChan <- msg.UnmarshalResult(responseStructPointer) // }, timeout, func() { diff --git a/client/core/trade.go b/client/core/trade.go index 177aacdcb9..cbabf3a7b7 100644 --- a/client/core/trade.go +++ b/client/core/trade.go @@ -659,7 +659,6 @@ func (t *trackedTrade) coreOrderInternal() *Order { counterConfs, int64(t.metaData.ToSwapConf), int64(mt.redemptionConfs), int64(mt.redemptionConfsReq))) } - corder.AllFeesConfirmed = allFeesConfirmed return corder diff --git a/client/mm/config.go b/client/mm/config.go index 1b5407f984..d9eb69d5af 100644 --- a/client/mm/config.go +++ b/client/mm/config.go @@ -11,11 +11,6 @@ import ( type MarketMakingWithCEXConfig struct { } -// ArbitrageConfig is the configuration for an arbitrage bot that only places -// when there is a profitable arbitrage opportunity. -type ArbitrageConfig struct { -} - type BalanceType uint8 const ( @@ -23,6 +18,16 @@ const ( Amount ) +// CEXConfig is a configuration for connecting to a CEX API. +type CEXConfig struct { + // CEXName is the name of the cex. + CEXName string `json:"cexName"` + // APIKey is the API key for the CEX. + APIKey string `json:"apiKey"` + // APISecret is the API secret for the CEX. + APISecret string `json:"apiSecret"` +} + // BotConfig is the configuration for a market making bot. // The balance fields are the initial amounts that will be reserved to use for // this bot. As the bot trades, the amounts reserved for it will be updated. @@ -40,7 +45,7 @@ type BotConfig struct { // Only one of the following configs should be set MMCfg *MarketMakingConfig `json:"marketMakingConfig,omitempty"` MMWithCEXCfg *MarketMakingWithCEXConfig `json:"marketMakingWithCEXConfig,omitempty"` - ArbCfg *ArbitrageConfig `json:"arbitrageConfig,omitempty"` + ArbCfg *SimpleArbConfig `json:"arbConfig,omitempty"` Disabled bool `json:"disabled"` } diff --git a/client/mm/libxc/binance.go b/client/mm/libxc/binance.go new file mode 100644 index 0000000000..19c0596eea --- /dev/null +++ b/client/mm/libxc/binance.go @@ -0,0 +1,1245 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package libxc + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "math" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "decred.org/dcrdex/client/asset" + "decred.org/dcrdex/client/comms" + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/calc" + "decred.org/dcrdex/dex/encode" +) + +// Binance API spot trading docs: +// https://binance-docs.github.io/apidocs/spot/en/#spot-account-trade + +const ( + httpURL = "https://api.binance.com" + websocketURL = "wss://stream.binance.com:9443" + + usHttpURL = "https://api.binance.us" + usWebsocketURL = "wss://stream.binance.us:9443" + + testnetHttpURL = "https://testnet.binance.vision" + testnetWebsocketURL = "wss://testnet.binance.vision" + + // sapi endpoints are not implemented by binance's test network. This url + // connects to the process at client/cmd/testbinance, which responds to the + // /sapi/v1/capital/config/getall endpoint. + fakeBinanceURL = "http://localhost:37346" +) + +type bookBin struct { + Price float64 + Qty float64 +} + +type bncBook struct { + bids []*bookBin + asks []*bookBin + latestUpdate int64 + numSubscribers uint32 +} + +func newBNCBook() *bncBook { + return &bncBook{ + bids: make([]*bookBin, 0), + asks: make([]*bookBin, 0), + numSubscribers: 1, + } +} + +type bncAssetConfig struct { + // assetID is the bip id + assetID uint32 + // symbol is the DEX asset symbol, always lower case + symbol string + // coin is the asset symbol on binance, always upper case. + // For a token like USDC, the coin field will be USDC, but + // symbol field will be usdc.eth. + coin string + // chain will be the same as coin for the base assets of + // a blockchain, but for tokens it will be the chain + // that the token is hosted such as "ETH". + chain string + conversionFactor uint64 +} + +func bncSymbolData(symbol string) (*bncAssetConfig, error) { + coin := strings.ToUpper(symbol) + var ok bool + assetID, ok := dex.BipSymbolID(symbol) + if !ok { + return nil, fmt.Errorf("not id found for %q", symbol) + } + networkID := assetID + if token := asset.TokenInfo(assetID); token != nil { + networkID = token.ParentID + parts := strings.Split(symbol, ".") + coin = strings.ToUpper(parts[0]) + } + ui, err := asset.UnitInfo(assetID) + if err != nil { + return nil, fmt.Errorf("no unit info found for %d", assetID) + } + return &bncAssetConfig{ + assetID: assetID, + symbol: symbol, + coin: coin, + chain: strings.ToUpper(dex.BipIDSymbol(networkID)), + conversionFactor: ui.Conventional.ConversionFactor, + }, nil +} + +// bncBalance is the balance of an asset in conventional units. This must be +// converted before returning. +type bncBalance struct { + available float64 + locked float64 +} + +type binance struct { + log dex.Logger + url string + wsURL string + apiKey string + secretKey string + knownAssets map[uint32]bool + net dex.Network + tradeIDNonce atomic.Uint32 + tradeIDNoncePrefix dex.Bytes + + markets atomic.Value // map[string]*bnMarket + // tokenIDs maps the token's symbol to the list of bip ids of the token + // for each chain for which deposits and withdrawals are enabled on + // binance. + tokenIDs atomic.Value // map[string][]uint32 + + balanceMtx sync.RWMutex + balances map[string]*bncBalance + + marketStreamMtx sync.RWMutex + marketStream comms.WsConn + + booksMtx sync.RWMutex + books map[string]*bncBook + + tradeUpdaterMtx sync.RWMutex + tradeToUpdater map[string]int + tradeUpdaters map[int]chan *TradeUpdate + tradeUpdateCounter int + + cexUpdatersMtx sync.RWMutex + cexUpdaters map[chan interface{}]struct{} +} + +var _ CEX = (*binance)(nil) + +func newBinance(apiKey, secretKey string, log dex.Logger, net dex.Network, binanceUS bool) *binance { + url, wsURL := httpURL, websocketURL + if binanceUS { + url, wsURL = usHttpURL, usWebsocketURL + } + if net == dex.Testnet || net == dex.Simnet { + url, wsURL = testnetHttpURL, testnetWebsocketURL + } + + registeredAssets := asset.Assets() + knownAssets := make(map[uint32]bool, len(registeredAssets)) + for _, a := range registeredAssets { + knownAssets[a.ID] = true + } + + bnc := &binance{ + log: log, + url: url, + wsURL: wsURL, + apiKey: apiKey, + secretKey: secretKey, + knownAssets: knownAssets, + balances: make(map[string]*bncBalance), + books: make(map[string]*bncBook), + net: net, + tradeToUpdater: make(map[string]int), + tradeUpdaters: make(map[int]chan *TradeUpdate), + cexUpdaters: make(map[chan interface{}]struct{}, 0), + tradeIDNoncePrefix: encode.RandomBytes(10), + } + + bnc.markets.Store(make(map[string]*bnMarket)) + bnc.tokenIDs.Store(make(map[string][]uint32)) + + return bnc +} + +func (bnc *binance) updateBalances(coinsData []*binanceCoinInfo) { + bnc.balanceMtx.Lock() + defer bnc.balanceMtx.Unlock() + + for _, nfo := range coinsData { + bnc.balances[nfo.Coin] = &bncBalance{ + available: nfo.Free, + locked: nfo.Locked, + } + } +} + +func (bnc *binance) getCoinInfo(ctx context.Context) error { + coins := make([]*binanceCoinInfo, 0) + err := bnc.getAPI(ctx, "/sapi/v1/capital/config/getall", nil, true, true, &coins) + if err != nil { + return fmt.Errorf("error getting binance coin info: %v", err) + } + + bnc.updateBalances(coins) + + tokenIDs := make(map[string][]uint32) + for _, nfo := range coins { + tokenSymbol := strings.ToLower(nfo.Coin) + chainIDs, isToken := dex.TokenChains[tokenSymbol] + if !isToken { + continue + } + isSupportedChain := func(assetID uint32) (uint32, bool) { + for _, chainID := range chainIDs { + if chainID[1] == assetID { + return chainID[0], true + } + } + return 0, false + } + for _, netInfo := range nfo.NetworkList { + chainSymbol := strings.ToLower(netInfo.Network) + chainID, found := dex.BipSymbolID(chainSymbol) + if !found { + continue + } + if !netInfo.WithdrawEnable || !netInfo.DepositEnable { + bnc.log.Tracef("Skipping %s network %s because deposits and/or withdraws are not enabled.", tokenSymbol, chainSymbol) + continue + } + if tokenBipId, supported := isSupportedChain(chainID); supported { + tokenIDs[tokenSymbol] = append(tokenIDs[tokenSymbol], tokenBipId) + } + } + } + bnc.tokenIDs.Store(tokenIDs) + + return nil +} + +func (bnc *binance) getMarkets(ctx context.Context) error { + var exchangeInfo struct { + Timezone string `json:"timezone"` + ServerTime int64 `json:"serverTime"` + RateLimits []struct { + RateLimitType string `json:"rateLimitType"` + Interval string `json:"interval"` + IntervalNum int64 `json:"intervalNum"` + Limit int64 `json:"limit"` + } `json:"rateLimits"` + Symbols []*bnMarket `json:"symbols"` + } + err := bnc.getAPI(ctx, "/api/v3/exchangeInfo", nil, false, false, &exchangeInfo) + if err != nil { + return fmt.Errorf("error getting markets from Binance: %w", err) + } + + marketsMap := make(map[string]*bnMarket, len(exchangeInfo.Symbols)) + for _, market := range exchangeInfo.Symbols { + marketsMap[market.Symbol] = market + } + + bnc.markets.Store(marketsMap) + + return nil +} + +// Connect connects to the binance API. +func (bnc *binance) Connect(ctx context.Context) (*sync.WaitGroup, error) { + wg := new(sync.WaitGroup) + + if err := bnc.getCoinInfo(ctx); err != nil { + return nil, fmt.Errorf("error getting coin info: %v", err) + } + + if err := bnc.getMarkets(ctx); err != nil { + return nil, fmt.Errorf("error getting markets: %v", err) + } + + if err := bnc.getUserDataStream(ctx); err != nil { + return nil, err + } + + // Refresh the markets periodically. + wg.Add(1) + go func() { + defer wg.Done() + nextTick := time.After(time.Hour) + for { + select { + case <-nextTick: + err := bnc.getMarkets(ctx) + if err != nil { + bnc.log.Errorf("Error fetching markets: %v", err) + nextTick = time.After(time.Minute) + } else { + nextTick = time.After(time.Hour) + bnc.sendCexUpdateNotes() + } + case <-ctx.Done(): + return + } + } + }() + + // Refresh the coin info periodically. + wg.Add(1) + go func() { + defer wg.Done() + nextTick := time.After(time.Hour) + for { + select { + case <-nextTick: + err := bnc.getCoinInfo(ctx) + if err != nil { + bnc.log.Errorf("Error fetching markets: %v", err) + nextTick = time.After(time.Minute) + } else { + nextTick = time.After(time.Hour) + bnc.sendCexUpdateNotes() + } + case <-ctx.Done(): + return + } + } + }() + + return wg, nil +} + +// SubscribeCEXUpdates returns a channel which sends an empty struct when +// the balance of an asset on the CEX has been updated. +func (bnc *binance) SubscribeCEXUpdates() (<-chan interface{}, func()) { + updater := make(chan interface{}, 128) + bnc.cexUpdatersMtx.Lock() + bnc.cexUpdaters[updater] = struct{}{} + bnc.cexUpdatersMtx.Unlock() + + unsubscribe := func() { + bnc.cexUpdatersMtx.Lock() + delete(bnc.cexUpdaters, updater) + bnc.cexUpdatersMtx.Unlock() + } + + return updater, unsubscribe +} + +// Balance returns the balance of an asset at the CEX. +func (bnc *binance) Balance(symbol string) (*ExchangeBalance, error) { + assetConfig, err := bncSymbolData(symbol) + if err != nil { + return nil, err + } + + bnc.balanceMtx.RLock() + defer bnc.balanceMtx.RUnlock() + + bal, found := bnc.balances[assetConfig.coin] + if !found { + return nil, fmt.Errorf("no %q balance found", assetConfig.coin) + } + + return &ExchangeBalance{ + Available: uint64(math.Floor(bal.available * float64(assetConfig.conversionFactor))), + Locked: uint64(math.Floor(bal.locked * float64(assetConfig.conversionFactor))), + }, nil +} + +func (bnc *binance) generateTradeID() string { + nonce := bnc.tradeIDNonce.Add(1) + nonceB := encode.Uint32Bytes(nonce) + return hex.EncodeToString(append(bnc.tradeIDNoncePrefix, nonceB...)) +} + +// Trade executes a trade on the CEX. subscriptionID takes an ID returned from +// SubscribeTradeUpdates. +func (bnc *binance) Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, subscriptionID int) (string, error) { + side := "BUY" + if sell { + side = "SELL" + } + + baseCfg, err := bncSymbolData(baseSymbol) + if err != nil { + return "", fmt.Errorf("error getting symbol data for %s: %w", baseSymbol, err) + } + + quoteCfg, err := bncSymbolData(quoteSymbol) + if err != nil { + return "", fmt.Errorf("error getting symbol data for %s: %w", quoteSymbol, err) + } + + slug := baseCfg.coin + quoteCfg.coin + + marketsMap := bnc.markets.Load().(map[string]*bnMarket) + market, found := marketsMap[slug] + if !found { + return "", fmt.Errorf("market not found: %v", slug) + } + + price := calc.ConventionalRateAlt(rate, baseCfg.conversionFactor, quoteCfg.conversionFactor) + amt := float64(qty) / float64(baseCfg.conversionFactor) + tradeID := bnc.generateTradeID() + + v := make(url.Values) + v.Add("symbol", slug) + v.Add("side", side) + v.Add("type", "LIMIT") + v.Add("timeInForce", "GTC") + v.Add("newClientOrderId", tradeID) + v.Add("quantity", strconv.FormatFloat(amt, 'f', market.BaseAssetPrecision, 64)) + v.Add("price", strconv.FormatFloat(price, 'f', market.QuoteAssetPrecision, 64)) + + bnc.tradeUpdaterMtx.RLock() + _, found = bnc.tradeUpdaters[subscriptionID] + if !found { + bnc.tradeUpdaterMtx.RUnlock() + return "", fmt.Errorf("no trade updater with ID %v", subscriptionID) + } + bnc.tradeUpdaterMtx.RUnlock() + + err = bnc.postAPI(ctx, "/api/v3/order", v, nil, true, true, &struct{}{}) + if err != nil { + return "", err + } + + bnc.tradeUpdaterMtx.Lock() + defer bnc.tradeUpdaterMtx.Unlock() + bnc.tradeToUpdater[tradeID] = subscriptionID + + return tradeID, err +} + +// SubscribeTradeUpdates returns a channel that the caller can use to +// listen for updates to a trade's status. When the subscription ID +// returned from this function is passed as the updaterID argument to +// Trade, then updates to the trade will be sent on the updated channel +// returned from this function. +func (bnc *binance) SubscribeTradeUpdates() (<-chan *TradeUpdate, func(), int) { + bnc.tradeUpdaterMtx.Lock() + defer bnc.tradeUpdaterMtx.Unlock() + updaterID := bnc.tradeUpdateCounter + bnc.tradeUpdateCounter++ + updater := make(chan *TradeUpdate, 256) + bnc.tradeUpdaters[updaterID] = updater + + unsubscribe := func() { + bnc.tradeUpdaterMtx.Lock() + delete(bnc.tradeUpdaters, updaterID) + bnc.tradeUpdaterMtx.Unlock() + } + + return updater, unsubscribe, updaterID +} + +// CancelTrade cancels a trade on the CEX. +func (bnc *binance) CancelTrade(ctx context.Context, baseSymbol, quoteSymbol string, tradeID string) error { + baseCfg, err := bncSymbolData(baseSymbol) + if err != nil { + return fmt.Errorf("error getting symbol data for %s: %w", baseSymbol, err) + } + + quoteCfg, err := bncSymbolData(quoteSymbol) + if err != nil { + return fmt.Errorf("error getting symbol data for %s: %w", quoteSymbol, err) + } + + slug := baseCfg.coin + quoteCfg.coin + + v := make(url.Values) + v.Add("symbol", slug) + v.Add("origClientOrderId", tradeID) + + req, err := bnc.generateRequest(ctx, "DELETE", "/api/v3/order", v, nil, true, true) + if err != nil { + return err + } + + return bnc.requestInto(req, &struct{}{}) +} + +func (bnc *binance) Balances() (map[uint32]*ExchangeBalance, error) { + bnc.balanceMtx.RLock() + defer bnc.balanceMtx.RUnlock() + + balances := make(map[uint32]*ExchangeBalance) + + for coin, bal := range bnc.balances { + assetConfig, err := bncSymbolData(strings.ToLower(coin)) + if err != nil { + continue + } + + balances[assetConfig.assetID] = &ExchangeBalance{ + Available: uint64(bal.available * float64(assetConfig.conversionFactor)), + Locked: uint64(bal.locked * float64(assetConfig.conversionFactor)), + } + } + + return balances, nil +} + +func (bnc *binance) Markets() ([]*Market, error) { + bnMarkets := bnc.markets.Load().(map[string]*bnMarket) + markets := make([]*Market, 0, 16) + tokenIDs := bnc.tokenIDs.Load().(map[string][]uint32) + for _, mkt := range bnMarkets { + markets = append(markets, mkt.dexMarkets(tokenIDs)...) + } + + return markets, nil +} + +func (bnc *binance) getAPI(ctx context.Context, endpoint string, query url.Values, key, sign bool, thing interface{}) error { + req, err := bnc.generateRequest(ctx, http.MethodGet, endpoint, query, nil, key, sign) + if err != nil { + return fmt.Errorf("generateRequest error: %w", err) + } + return bnc.requestInto(req, thing) +} + +func (bnc *binance) postAPI(ctx context.Context, endpoint string, query, form url.Values, key, sign bool, thing interface{}) error { + req, err := bnc.generateRequest(ctx, http.MethodPost, endpoint, query, form, key, sign) + if err != nil { + return fmt.Errorf("generateRequest error: %w", err) + } + return bnc.requestInto(req, thing) +} + +func (bnc *binance) requestInto(req *http.Request, thing interface{}) error { + bnc.log.Tracef("Sending request: %+v", req) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("httpClient.Do error: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("http error (%d) %s", resp.StatusCode, resp.Status) + } + + if thing == nil { + return nil + } + // TODO: use buffered reader + reader := io.LimitReader(resp.Body, 1<<20) + r, err := io.ReadAll(reader) + if err != nil { + return err + } + + if err := json.Unmarshal(r, thing); err != nil { + return fmt.Errorf("json Decode error: %w", err) + } + return nil +} + +func (bnc *binance) generateRequest(ctx context.Context, method, endpoint string, query, form url.Values, key, sign bool) (*http.Request, error) { + var fullURL string + if (bnc.net == dex.Simnet || bnc.net == dex.Testnet) && strings.Contains(endpoint, "sapi") { + fullURL = fakeBinanceURL + endpoint + } else { + fullURL = bnc.url + endpoint + } + + if query == nil { + query = make(url.Values) + } + if sign { + query.Add("timestamp", strconv.FormatInt(time.Now().UnixMilli(), 10)) + } + queryString := query.Encode() + bodyString := form.Encode() + header := make(http.Header, 2) + body := bytes.NewBuffer(nil) + if bodyString != "" { + header.Set("Content-Type", "application/x-www-form-urlencoded") + body = bytes.NewBufferString(bodyString) + } + if key || sign { + header.Set("X-MBX-APIKEY", bnc.apiKey) + } + + if sign { + raw := queryString + bodyString + mac := hmac.New(sha256.New, []byte(bnc.secretKey)) + if _, err := mac.Write([]byte(raw)); err != nil { + return nil, fmt.Errorf("hmax Write error: %w", err) + } + v := url.Values{} + v.Set("signature", hex.EncodeToString(mac.Sum(nil))) + if queryString == "" { + queryString = v.Encode() + } else { + queryString = fmt.Sprintf("%s&%s", queryString, v.Encode()) + } + } + if queryString != "" { + fullURL = fmt.Sprintf("%s?%s", fullURL, queryString) + } + + req, err := http.NewRequestWithContext(ctx, method, fullURL, body) + if err != nil { + return nil, fmt.Errorf("NewRequestWithContext error: %w", err) + } + + req.Header = header + + return req, nil +} + +func (bnc *binance) getListenID(ctx context.Context) (string, error) { + var resp struct { + ListenKey string `json:"listenKey"` + } + return resp.ListenKey, bnc.postAPI(ctx, "/api/v3/userDataStream", nil, nil, true, false, &resp) +} + +type wsBalance struct { + Asset string `json:"a"` + Free float64 `json:"f,string"` + Locked float64 `json:"l,string"` +} + +type bncStreamUpdate struct { + Asset string `json:"a"` + EventType string `json:"e"` + ClientOrderID string `json:"c"` + CurrentOrderStatus string `json:"X"` + Balances []*wsBalance `json:"B"` + BalanceDelta float64 `json:"d,string"` + Filled float64 `json:"z,string"` + OrderQty float64 `json:"q,string"` + CancelledOrderID string `json:"C"` + E json.RawMessage `json:"E"` +} + +func decodeStreamUpdate(b []byte) (*bncStreamUpdate, error) { + var msg *bncStreamUpdate + if err := json.Unmarshal(b, &msg); err != nil { + return nil, err + } + return msg, nil +} + +func (bnc *binance) sendCexUpdateNotes() { + bnc.cexUpdatersMtx.RLock() + defer bnc.cexUpdatersMtx.RUnlock() + for updater := range bnc.cexUpdaters { + updater <- struct{}{} + } +} + +func (bnc *binance) handleOutboundAccountPosition(update *bncStreamUpdate) { + bnc.log.Tracef("Received outboundAccountPosition: %+v", update) + for _, bal := range update.Balances { + bnc.log.Tracef("balance: %+v", bal) + } + + bnc.balanceMtx.Lock() + for _, bal := range update.Balances { + symbol := strings.ToLower(bal.Asset) + bnc.balances[symbol] = &bncBalance{ + available: bal.Free, + locked: bal.Locked, + } + } + bnc.balanceMtx.Unlock() + bnc.sendCexUpdateNotes() +} + +func (bnc *binance) getTradeUpdater(tradeID string) (chan *TradeUpdate, error) { + bnc.tradeUpdaterMtx.RLock() + defer bnc.tradeUpdaterMtx.RUnlock() + + updaterID, found := bnc.tradeToUpdater[tradeID] + if !found { + return nil, fmt.Errorf("updater not found for trade ID %v", tradeID) + } + updater, found := bnc.tradeUpdaters[updaterID] + if !found { + return nil, fmt.Errorf("no updater with ID %v", tradeID) + } + + return updater, nil +} + +func (bnc *binance) removeTradeUpdater(tradeID string) { + bnc.tradeUpdaterMtx.RLock() + defer bnc.tradeUpdaterMtx.RUnlock() + delete(bnc.tradeToUpdater, tradeID) +} + +func (bnc *binance) handleExecutionReport(update *bncStreamUpdate) { + bnc.log.Tracef("Received executionReport: %+v", update) + + status := update.CurrentOrderStatus + var id string + if status == "CANCELED" { + id = update.CancelledOrderID + } else { + id = update.ClientOrderID + } + + updater, err := bnc.getTradeUpdater(id) + if err != nil { + bnc.log.Errorf("Error getting trade updater: %v", err) + return + } + + complete := status == "FILLED" || status == "CANCELED" || status == "REJECTED" || status == "EXPIRED" + updater <- &TradeUpdate{ + TradeID: id, + Complete: complete, + } + + if complete { + bnc.removeTradeUpdater(id) + } +} + +func (bnc *binance) getUserDataStream(ctx context.Context) (err error) { + streamHandler := func(b []byte) { + u, err := decodeStreamUpdate(b) + if err != nil { + bnc.log.Errorf("Error unmarshaling user stream update: %v", err) + bnc.log.Errorf("Raw message: %s", string(b)) + return + } + switch u.EventType { + case "outboundAccountPosition": + bnc.handleOutboundAccountPosition(u) + case "executionReport": + bnc.handleExecutionReport(u) + case "balanceUpdate": + // TODO: check if we need this.. is outbound account position enough? + bnc.log.Tracef("Received balanceUpdate: %s", string(b)) + } + } + + var listenKey string + newConn := func() (*dex.ConnectionMaster, error) { + listenKey, err = bnc.getListenID(ctx) + if err != nil { + return nil, err + } + + hdr := make(http.Header) + hdr.Set("X-MBX-APIKEY", bnc.apiKey) + conn, err := comms.NewWsConn(&comms.WsCfg{ + URL: bnc.wsURL + "/ws/" + listenKey, + + // Is this necessary for user data stream?? + PingWait: time.Minute * 4, + + ReconnectSync: func() { + bnc.log.Debugf("Binance reconnected") + }, + ConnectEventFunc: func(cs comms.ConnectionStatus) {}, + Logger: bnc.log.SubLogger("BNCWS"), + RawHandler: streamHandler, + ConnectHeaders: hdr, + }) + if err != nil { + return nil, err + } + + cm := dex.NewConnectionMaster(conn) + if err = cm.ConnectOnce(ctx); err != nil { + return nil, fmt.Errorf("user data stream connection error: %v", err) + } + + return cm, nil + } + + cm, err := newConn() + if err != nil { + return fmt.Errorf("error initializing connection: %v", err) + } + + go func() { + // A single connection to stream.binance.com is only valid for 24 hours; + // expect to be disconnected at the 24 hour mark. + reconnect := time.After(time.Hour * 12) + // Keepalive a user data stream to prevent a time out. User data streams + // will close after 60 minutes. It's recommended to send a ping about + // every 30 minutes. + keepAlive := time.NewTicker(time.Minute * 30) + defer keepAlive.Stop() + + connected := true // do not keep alive on a failed connection + for { + select { + case <-reconnect: + if cm != nil { + cm.Disconnect() + } + cm, err = newConn() + if err != nil { + connected = false + bnc.log.Errorf("Error reconnecting: %v", err) + reconnect = time.After(time.Second * 30) + } else { + connected = true + reconnect = time.After(time.Hour * 12) + } + case <-keepAlive.C: + if !connected { + continue + } + q := make(url.Values) + q.Add("listenKey", listenKey) + // Doing a PUT on a listenKey will extend its validity for 60 minutes. + req, err := bnc.generateRequest(ctx, http.MethodPut, "/api/v3/userDataStream", q, nil, true, false) + if err != nil { + bnc.log.Errorf("Error generating keep-alive request: %v", err) + continue + } + if err := bnc.requestInto(req, nil); err != nil { + bnc.log.Errorf("Error sending keep-alive request: %v", err) + } + case <-ctx.Done(): + return + } + } + }() + return nil +} + +var subscribeID uint64 + +type bncBookBinUpdate struct { + LastUpdateID uint64 `json:"lastUpdateId"` + Bids [][2]json.Number `json:"bids"` + Asks [][2]json.Number `json:"asks"` +} + +type bncBookNote struct { + StreamName string `json:"stream"` + Data *bncBookBinUpdate `json:"data"` +} + +func bncParseBookUpdates(pts [][2]json.Number) ([]*bookBin, error) { + bins := make([]*bookBin, 0, len(pts)) + for _, nums := range pts { + price, err := nums[0].Float64() + if err != nil { + return nil, fmt.Errorf("error parsing price: %v", err) + } + qty, err := nums[1].Float64() + if err != nil { + return nil, fmt.Errorf("error quantity qty: %v", err) + } + bins = append(bins, &bookBin{ + Price: price, + Qty: qty, + }) + } + return bins, nil +} + +func (bnc *binance) storeMarketStream(conn comms.WsConn) { + bnc.marketStreamMtx.Lock() + bnc.marketStream = conn + bnc.marketStreamMtx.Unlock() +} + +func (bnc *binance) subUnsubDepth(conn comms.WsConn, method, slug string) error { + req := &struct { + Method string `json:"method"` + Params []string `json:"params"` + ID uint64 `json:"id"` + }{ + Method: method, + Params: []string{ + slug + "@depth20", + }, + ID: atomic.AddUint64(&subscribeID, 1), + } + + b, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("error marshaling subscription stream request: %w", err) + } + + bnc.log.Debugf("Sending %v for market %v", method, slug) + if err := conn.SendRaw(b); err != nil { + return fmt.Errorf("error sending subscription stream request: %w", err) + } + + return nil +} + +func (bnc *binance) stopMarketDataStream(slug string) (err error) { + bnc.marketStreamMtx.RLock() + conn := bnc.marketStream + bnc.marketStreamMtx.RUnlock() + if conn == nil { + return fmt.Errorf("can't unsubscribe. no stream - %p", bnc) + } + + var unsubscribe bool + + bnc.booksMtx.Lock() + defer func() { + bnc.booksMtx.Unlock() + if unsubscribe { + if err := bnc.subUnsubDepth(conn, "UNSUBSCRIBE", slug); err != nil { + bnc.log.Errorf("subUnsubDepth(UNSUBSCRIBE): %v", err) + } + } + }() + + book, found := bnc.books[slug] + if !found { + unsubscribe = true + return nil + } + + book.numSubscribers-- + if book.numSubscribers == 0 { + unsubscribe = true + delete(bnc.books, slug) + } + + return nil +} + +func (bnc *binance) startMarketDataStream(ctx context.Context, baseSymbol, quoteSymbol string) (err error) { + slug := strings.ToLower(baseSymbol + quoteSymbol) + + bnc.marketStreamMtx.Lock() + defer bnc.marketStreamMtx.Unlock() + + // If a market stream already exists, just subscribe to this market. + if bnc.marketStream != nil { + bnc.booksMtx.Lock() + _, exists := bnc.books[slug] + if !exists { + bnc.books[slug] = newBNCBook() + } else { + bnc.books[slug].numSubscribers++ + } + bnc.booksMtx.Unlock() + + if err := bnc.subUnsubDepth(bnc.marketStream, "SUBSCRIBE", slug); err != nil { + return fmt.Errorf("subUnsubDepth(SUBSCRIBE): %v", err) + } + return nil + } + + streamHandler := func(b []byte) { + var note *bncBookNote + if err := json.Unmarshal(b, ¬e); err != nil { + bnc.log.Errorf("Error unmarshaling book note: %v", err) + return + } + + if note == nil || note.Data == nil { + bnc.log.Debugf("No data in %q update: %+v", slug, note) + return + } + + parts := strings.Split(note.StreamName, "@") + if len(parts) != 2 || parts[1] != "depth20" { + bnc.log.Errorf("Unknown stream name %q", note.StreamName) + return + } + slug = parts[0] // will be lower-case + + bnc.log.Tracef("Received %d bids and %d asks in a book update for %s", len(note.Data.Bids), len(note.Data.Asks), slug) + + bids, err := bncParseBookUpdates(note.Data.Bids) + if err != nil { + bnc.log.Errorf("Error parsing bid updates: %v", err) + return + } + + asks, err := bncParseBookUpdates(note.Data.Asks) + if err != nil { + bnc.log.Errorf("Error parsing ask updates: %v", err) + return + } + + bnc.booksMtx.Lock() + defer bnc.booksMtx.Unlock() + book := bnc.books[slug] + if book == nil { + bnc.log.Errorf("No book for stream %q", slug) + return + } + + book.latestUpdate = time.Now().Unix() + book.asks = asks + book.bids = bids + } + + newConn := func() (comms.WsConn, *dex.ConnectionMaster, error) { + if err := ctx.Err(); err != nil { + return nil, nil, err + } + bnc.log.Debugf("Creating a new Binance market stream handler") + // Get a list of current subscriptions so we can restart them all. + bnc.booksMtx.Lock() + bnc.books[slug] = newBNCBook() + bnc.booksMtx.Unlock() + + streamName := slug + "@depth20" + addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, streamName) + + // Need to send key but not signature + conn, err := comms.NewWsConn(&comms.WsCfg{ + URL: addr, + // The websocket server will send a ping frame every 3 minutes. If the + // websocket server does not receive a pong frame back from the + // connection within a 10 minute period, the connection will be + // disconnected. Unsolicited pong frames are allowed. + PingWait: time.Minute * 4, + ReconnectSync: func() { + bnc.log.Debugf("Binance reconnected") + }, + ConnectEventFunc: func(cs comms.ConnectionStatus) {}, + Logger: bnc.log.SubLogger("BNCBOOK"), + RawHandler: streamHandler, + }) + if err != nil { + return nil, nil, err + } + + cm := dex.NewConnectionMaster(conn) + if err = cm.ConnectOnce(ctx); err != nil { + return nil, nil, fmt.Errorf("websocketHandler remote connect: %v", err) + } + + return conn, cm, nil + } + + conn, cm, err := newConn() + if err != nil { + return fmt.Errorf("error initializing connection: %v", err) + } + bnc.marketStream = conn + + go func() { + defer func() { + bnc.storeMarketStream(nil) + if cm != nil { + cm.Disconnect() + } + }() + + reconnect := time.After(time.Hour * 12) + for { + select { + case <-reconnect: + if cm != nil { + cm.Disconnect() + } + conn, cm, err = newConn() + if err != nil { + bnc.log.Errorf("Error reconnecting: %v", err) + reconnect = time.After(time.Second * 30) + } else { + bnc.storeMarketStream(conn) + reconnect = time.After(time.Hour * 12) + } + case <-ctx.Done(): + return + } + } + }() + + return nil +} + +// UnsubscribeMarket unsubscribes from order book updates on a market. +func (bnc *binance) UnsubscribeMarket(baseSymbol, quoteSymbol string) { + bnc.stopMarketDataStream(strings.ToLower(baseSymbol + quoteSymbol)) +} + +// SubscribeMarket subscribes to order book updates on a market. This must +// be called before calling VWAP. +func (bnc *binance) SubscribeMarket(ctx context.Context, baseSymbol, quoteSymbol string) error { + return bnc.startMarketDataStream(ctx, baseSymbol, quoteSymbol) +} + +// VWAP returns the volume weighted average price for a certain quantity +// of the base asset on a market. +func (bnc *binance) VWAP(baseSymbol, quoteSymbol string, sell bool, qty uint64) (avgPrice, extrema uint64, filled bool, err error) { + fail := func(err error) (uint64, uint64, bool, error) { + return 0, 0, false, err + } + + slug := strings.ToLower(baseSymbol + quoteSymbol) + var side []*bookBin + var latestUpdate int64 + bnc.booksMtx.RLock() + book, found := bnc.books[slug] + if found { + latestUpdate = book.latestUpdate + if sell { + side = book.asks + } else { + side = book.bids + } + } + bnc.booksMtx.RUnlock() + if side == nil { + return fail(fmt.Errorf("no book found for %s", slug)) + } + + if latestUpdate < time.Now().Unix()-60 { + return fail(fmt.Errorf("book for %s is stale", slug)) + } + + baseCfg, err := bncSymbolData(baseSymbol) + if err != nil { + return fail(fmt.Errorf("error getting symbol data for %s: %w", baseSymbol, err)) + } + + quoteCfg, err := bncSymbolData(quoteSymbol) + if err != nil { + return fail(fmt.Errorf("error getting symbol data for %s: %w", quoteSymbol, err)) + } + + remaining := qty + var weightedSum uint64 + for _, bin := range side { + extrema = calc.MessageRateAlt(bin.Price, baseCfg.conversionFactor, quoteCfg.conversionFactor) + binQty := uint64(bin.Qty * float64(baseCfg.conversionFactor)) + if binQty >= remaining { + filled = true + weightedSum += remaining * extrema + break + } + remaining -= binQty + weightedSum += binQty * extrema + } + + if !filled { + return 0, 0, false, nil + } + + return weightedSum / qty, extrema, true, nil +} + +type binanceNetworkInfo struct { + AddressRegex string `json:"addressRegex"` + Coin string `json:"coin"` + DepositEnable bool `json:"depositEnable"` + IsDefault bool `json:"isDefault"` + MemoRegex string `json:"memoRegex"` + MinConfirm int `json:"minConfirm"` + Name string `json:"name"` + Network string `json:"network"` + ResetAddressStatus bool `json:"resetAddressStatus"` + SpecialTips string `json:"specialTips"` + UnLockConfirm int `json:"unLockConfirm"` + WithdrawEnable bool `json:"withdrawEnable"` + WithdrawFee float64 `json:"withdrawFee,string"` + WithdrawIntegerMultiple float64 `json:"withdrawIntegerMultiple,string"` + WithdrawMax float64 `json:"withdrawMax,string"` + WithdrawMin float64 `json:"withdrawMin,string"` + SameAddress bool `json:"sameAddress"` + EstimatedArrivalTime int `json:"estimatedArrivalTime"` + Busy bool `json:"busy"` +} + +type binanceCoinInfo struct { + Coin string `json:"coin"` + DepositAllEnable bool `json:"depositAllEnable"` + Free float64 `json:"free,string"` + Freeze float64 `json:"freeze,string"` + Ipoable float64 `json:"ipoable,string"` + Ipoing float64 `json:"ipoing,string"` + IsLegalMoney bool `json:"isLegalMoney"` + Locked float64 `json:"locked,string"` + Name string `json:"name"` + Storage float64 `json:"storage,string"` + Trading bool `json:"trading"` + WithdrawAllEnable bool `json:"withdrawAllEnable"` + Withdrawing float64 `json:"withdrawing,string"` + NetworkList []*binanceNetworkInfo `json:"networkList"` +} + +type bnMarket struct { + Symbol string `json:"symbol"` + Status string `json:"status"` + BaseAsset string `json:"baseAsset"` + BaseAssetPrecision int `json:"baseAssetPrecision"` + QuoteAsset string `json:"quoteAsset"` + QuoteAssetPrecision int `json:"quoteAssetPrecision"` + OrderTypes []string `json:"orderTypes"` +} + +// dexMarkets returns all the possible markets for this symbol. A symbol +// represents a single market on the CEX, but tokens on the DEX have a +// different assetID for each network they are on, therefore they will +// match multiple markets as defined using assetID. +func (s *bnMarket) dexMarkets(tokenIDs map[string][]uint32) []*Market { + var baseAssetIDs, quoteAssetIDs []uint32 + + getAssetIDs := func(coin string) []uint32 { + symbol := strings.ToLower(coin) + if assetID, found := dex.BipSymbolID(symbol); found { + return []uint32{assetID} + } + + if tokenIDs, found := tokenIDs[symbol]; found { + return tokenIDs + } + + return nil + } + + baseAssetIDs = getAssetIDs(s.BaseAsset) + if len(baseAssetIDs) == 0 { + return nil + } + + quoteAssetIDs = getAssetIDs(s.QuoteAsset) + if len(quoteAssetIDs) == 0 { + return nil + } + + markets := make([]*Market, 0, len(baseAssetIDs)*len(quoteAssetIDs)) + for _, baseID := range baseAssetIDs { + for _, quoteID := range quoteAssetIDs { + markets = append(markets, &Market{ + BaseID: baseID, + QuoteID: quoteID, + }) + } + } + return markets +} diff --git a/client/mm/libxc/binance_live_test.go b/client/mm/libxc/binance_live_test.go new file mode 100644 index 0000000000..4210aaee18 --- /dev/null +++ b/client/mm/libxc/binance_live_test.go @@ -0,0 +1,234 @@ +//go:build bnclive + +package libxc + +import ( + "context" + "os" + "os/user" + "sync" + "testing" + "time" + + "decred.org/dcrdex/client/asset" + "decred.org/dcrdex/dex" +) + +var ( + log = dex.StdOutLogger("T", dex.LevelTrace) + u, _ = user.Current() + apiKey = "" + apiSecret = "" +) + +func TestMain(m *testing.M) { + if s := os.Getenv("SECRET"); s != "" { + apiSecret = s + } + if k := os.Getenv("KEY"); k != "" { + apiKey = k + } + + m.Run() +} + +func tNewBinance(t *testing.T, network dex.Network) *binance { + return newBinance(apiKey, apiSecret, log, network, true) +} + +type spoofDriver struct { + cFactor uint64 +} + +func (drv *spoofDriver) Open(*asset.WalletConfig, dex.Logger, dex.Network) (asset.Wallet, error) { + return nil, nil +} + +func (drv *spoofDriver) DecodeCoinID(coinID []byte) (string, error) { + return "", nil +} + +func (drv *spoofDriver) Info() *asset.WalletInfo { + return &asset.WalletInfo{ + UnitInfo: dex.UnitInfo{ + Conventional: dex.Denomination{ + ConversionFactor: drv.cFactor, + }, + }, + } +} + +func init() { + asset.Register(42, &spoofDriver{cFactor: 1e9}) // dcr + asset.Register(60, &spoofDriver{cFactor: 1e9}) // eth + asset.Register(966, &spoofDriver{cFactor: 1e9}) // matic + asset.Register(0, &spoofDriver{cFactor: 1e8}) // btc + asset.RegisterToken(60001, &dex.Token{ + ParentID: 60, + Name: "USDC", + UnitInfo: dex.UnitInfo{ + Conventional: dex.Denomination{ + ConversionFactor: 1e6, + }, + }, + }, &asset.WalletDefinition{}, dex.Mainnet, dex.Testnet, dex.Simnet) +} + +func TestConnect(t *testing.T) { + bnc := tNewBinance(t, dex.Simnet) + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*23) + defer cancel() + + _, err := bnc.Connect(ctx) + if err != nil { + t.Fatalf("Connect error: %v", err) + } + + balance, err := bnc.Balance("eth") + if err != nil { + t.Fatalf("Balance error: %v", err) + } + t.Logf("usdc balance: %v", balance) + + balance, err = bnc.Balance("btc") + if err != nil { + t.Fatalf("Balance error: %v", err) + } + t.Logf("btc balance: %v", balance) +} + +// This may fail due to balance being to low. You can try switching the side +// of the trade or the qty. +func TestTrade(t *testing.T) { + bnc := tNewBinance(t, dex.Simnet) + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*23) + defer cancel() + _, err := bnc.Connect(ctx) + if err != nil { + t.Fatalf("Connect error: %v", err) + } + + wg := sync.WaitGroup{} + wg.Add(1) + updates, unsubscribe, updaterID := bnc.SubscribeTradeUpdates() + defer unsubscribe() + go func() { + defer wg.Done() + for { + select { + case tradeUpdate := <-updates: + t.Logf("Trade Update: %+v", tradeUpdate) + if tradeUpdate.Complete { + // Sleep because context might get cancelled before + // Trade returns. + time.Sleep(1 * time.Second) + cancel() + return + } + case <-ctx.Done(): + return + } + } + }() + tradeID, err := bnc.Trade(ctx, "eth", "btc", false, 6000e2, 1e7, updaterID) + if err != nil { + t.Fatalf("trade error: %v", err) + } + + if true { // Cancel the trade + time.Sleep(1 * time.Second) + err = bnc.CancelTrade(ctx, "eth", "btc", tradeID) + if err != nil { + t.Fatalf("error cancelling trade: %v", err) + } + } + + wg.Wait() +} + +func TestCancelTrade(t *testing.T) { + tradeID := "42641326270691d752e000000001" + + bnc := tNewBinance(t, dex.Testnet) + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*23) + defer cancel() + _, err := bnc.Connect(ctx) + if err != nil { + t.Fatalf("Connect error: %v", err) + } + + err = bnc.CancelTrade(ctx, "eth", "btc", tradeID) + if err != nil { + t.Fatalf("error cancelling trade: %v", err) + } +} + +func TestMarkets(t *testing.T) { + bnc := tNewBinance(t, dex.Mainnet) + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*23) + defer cancel() + + _, err := bnc.Connect(ctx) + if err != nil { + t.Fatalf("Connect error: %v", err) + } + + markets, err := bnc.Markets() + if err != nil { + t.Fatalf("failed to load markets") + } + + for _, market := range markets { + t.Logf("%v - %v", dex.BipIDSymbol(market.BaseID), dex.BipIDSymbol(market.QuoteID)) + } +} + +func TestVWAP(t *testing.T) { + bnc := tNewBinance(t, dex.Testnet) + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*23) + defer cancel() + _, err := bnc.Connect(ctx) + if err != nil { + t.Fatalf("Connect error: %v", err) + } + + err = bnc.SubscribeMarket(ctx, "eth", "btc") + if err != nil { + t.Fatalf("failed to subscribe to market: %v", err) + } + + time.Sleep(10 * time.Second) + avg, extrema, filled, err := bnc.VWAP("eth", "btc", true, 2e9) + if err != nil { + t.Fatalf("VWAP failed: %v", err) + } + + t.Logf("avg: %v, extrema: %v, filled: %v", avg, extrema, filled) + + err = bnc.SubscribeMarket(ctx, "eth", "btc") + if err != nil { + t.Fatalf("failed to subscribe to market: %v", err) + } + time.Sleep(2 * time.Second) + + avg, extrema, filled, err = bnc.VWAP("eth", "btc", true, 2e9) + if err != nil { + t.Fatalf("VWAP failed: %v", err) + } + + t.Logf("avg: %v, extrema: %v, filled: %v", avg, extrema, filled) + + bnc.UnsubscribeMarket("eth", "btc") + + avg, extrema, filled, err = bnc.VWAP("eth", "btc", true, 2e9) + if err != nil { + t.Fatalf("VWAP failed: %v", err) + } + + t.Logf("avg: %v, extrema: %v, filled: %v", avg, extrema, filled) + + bnc.UnsubscribeMarket("eth", "btc") + if err != nil { + t.Fatalf("error unsubscribing market") + } +} diff --git a/client/mm/libxc/binance_test.go b/client/mm/libxc/binance_test.go new file mode 100644 index 0000000000..d8dd463c3f --- /dev/null +++ b/client/mm/libxc/binance_test.go @@ -0,0 +1,40 @@ +package libxc + +import ( + "testing" +) + +func TestSubscribeCEXUpdates(t *testing.T) { + bn := &binance{ + cexUpdaters: make(map[chan interface{}]struct{}), + } + _, unsub0 := bn.SubscribeCEXUpdates() + bn.SubscribeCEXUpdates() + unsub0() + bn.SubscribeCEXUpdates() + if len(bn.cexUpdaters) != 2 { + t.Fatalf("wrong number of updaters. wanted 2, got %d", len(bn.cexUpdaters)) + } +} + +func TestSubscribeTradeUpdates(t *testing.T) { + bn := &binance{ + tradeUpdaters: make(map[int]chan *TradeUpdate), + } + _, unsub0, _ := bn.SubscribeTradeUpdates() + _, _, id1 := bn.SubscribeTradeUpdates() + unsub0() + _, _, id2 := bn.SubscribeTradeUpdates() + if len(bn.tradeUpdaters) != 2 { + t.Fatalf("wrong number of updaters. wanted 2, got %d", len(bn.tradeUpdaters)) + } + if id1 == id2 { + t.Fatalf("ids should be unique. got %d twice", id1) + } + if _, found := bn.tradeUpdaters[id1]; !found { + t.Fatalf("id1 not found") + } + if _, found := bn.tradeUpdaters[id2]; !found { + t.Fatalf("id2 not found") + } +} diff --git a/client/mm/libxc/interface.go b/client/mm/libxc/interface.go new file mode 100644 index 0000000000..cf4268add1 --- /dev/null +++ b/client/mm/libxc/interface.go @@ -0,0 +1,87 @@ +package libxc + +import ( + "context" + "fmt" + + "decred.org/dcrdex/dex" +) + +// ExchangeBalance holds the available and locked balances of an asset +// on a CEX. +type ExchangeBalance struct { + Available uint64 `json:"available"` + Locked uint64 `json:"locked"` +} + +// TradeUpdate is a notification sent when the status of a trade on the CEX +// has been updated. +type TradeUpdate struct { + TradeID string + Complete bool // cancelled or filled +} + +// Market is the base and quote assets of a market on a CEX. +type Market struct { + BaseID uint32 `json:"base"` + QuoteID uint32 `json:"quote"` +} + +// CEX implements a set of functions that can be used to interact with a +// centralized exchange's spot trading API. All rates and quantities +// when interacting with the CEX interface will adhere to the standard +// rates and quantities of the DEX. +type CEX interface { + dex.Connector + // Balance returns the balance of an asset at the CEX. + Balance(symbol string) (*ExchangeBalance, error) + // Balances returns a list of all asset balances at the CEX. Only assets that are + // registered in the DEX client will be returned. + Balances() (map[uint32]*ExchangeBalance, error) + // CancelTrade cancels a trade on the CEX. + CancelTrade(ctx context.Context, baseSymbol, quoteSymbol, tradeID string) error + // Markets returns the list of markets at the CEX. + Markets() ([]*Market, error) + // SubscribeCEXUpdates returns a channel which sends an empty struct when + // the balance of an asset on the CEX has been updated. + SubscribeCEXUpdates() (updates <-chan interface{}, unsubscribe func()) + // SubscribeMarket subscribes to order book updates on a market. This must + // be called before calling VWAP. + SubscribeMarket(ctx context.Context, baseSymbol, quoteSymbol string) error + // SubscribeTradeUpdates returns a channel that the caller can use to + // listen for updates to a trade's status. When the subscription ID + // returned from this function is passed as the updaterID argument to + // Trade, then updates to the trade will be sent on the updated channel + // returned from this function. + SubscribeTradeUpdates() (updates <-chan *TradeUpdate, unsubscribe func(), subscriptionID int) + // Trade executes a trade on the CEX. updaterID takes a subscriptionID + // returned from SubscribeTradeUpdates. + Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, subscriptionID int) (string, error) + // UnsubscribeMarket unsubscribes from order book updates on a market. + UnsubscribeMarket(baseSymbol, quoteSymbol string) + // VWAP returns the volume weighted average price for a certain quantity + // of the base asset on a market. + VWAP(baseSymbol, quoteSymbol string, sell bool, qty uint64) (vwap, extrema uint64, filled bool, err error) +} + +const ( + Binance = "Binance" + BinanceUS = "BinanceUS" +) + +// IsValidCEXName returns whether or not a cex name is supported. +func IsValidCexName(cexName string) bool { + return cexName == Binance || cexName == BinanceUS +} + +// NewCEX creates a new CEX. +func NewCEX(cexName string, apiKey, secretKey string, log dex.Logger, net dex.Network) (CEX, error) { + switch cexName { + case Binance: + return newBinance(apiKey, secretKey, log, net, false), nil + case BinanceUS: + return newBinance(apiKey, secretKey, log, net, true), nil + default: + return nil, fmt.Errorf("unrecognized CEX: %v", cexName) + } +} diff --git a/client/mm/mm.go b/client/mm/mm.go index 0968780fc1..05ef9ae366 100644 --- a/client/mm/mm.go +++ b/client/mm/mm.go @@ -12,6 +12,7 @@ import ( "sync/atomic" "decred.org/dcrdex/client/core" + "decred.org/dcrdex/client/mm/libxc" "decred.org/dcrdex/client/orderbook" "decred.org/dcrdex/dex" "decred.org/dcrdex/dex/calc" @@ -45,6 +46,7 @@ var _ clientCore = (*core.Core)(nil) // Avoids having to mock the entire orderbook in tests. type dexOrderBook interface { MidGap() (uint64, error) + VWAP(lots, lotSize uint64, sell bool) (avg, extrema uint64, filled bool, err error) } var _ dexOrderBook = (*orderbook.OrderBook)(nil) @@ -729,7 +731,7 @@ func (m *MarketMaker) handleNotification(n core.Notification) { } // Run starts the MarketMaker. There can only be one BotConfig per dex market. -func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) error { +func (m *MarketMaker) Run(ctx context.Context, botCfgs []*BotConfig, cexCfgs []*CEXConfig, pw []byte) error { if !m.running.CompareAndSwap(false, true) { return errors.New("market making is already running") } @@ -743,7 +745,7 @@ func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) err m.ctx, m.die = context.WithCancel(ctx) - enabledCfgs, err := validateAndFilterEnabledConfigs(cfgs) + enabledCfgs, err := validateAndFilterEnabledConfigs(botCfgs) if err != nil { return err } @@ -762,6 +764,8 @@ func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) err } user := m.core.User() + cexes := make(map[string]libxc.CEX) + cexCMs := make(map[string]*dex.ConnectionMaster) startedMarketMaking = true @@ -784,12 +788,47 @@ func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) err } }() - // Start each bot. + var cexCfgMap map[string]*CEXConfig + if len(cexCfgs) > 0 { + cexCfgMap = make(map[string]*CEXConfig, len(cexCfgs)) + for _, cexCfg := range cexCfgs { + cexCfgMap[cexCfg.CEXName] = cexCfg + } + } + + getConnectedCEX := func(cexName string) (libxc.CEX, error) { + var cex libxc.CEX + var found bool + if cex, found = cexes[cexName]; !found { + cexCfg := cexCfgMap[cexName] + if cexCfg == nil { + return nil, fmt.Errorf("no CEX config provided for %s", cexName) + } + logger := m.log.SubLogger(fmt.Sprintf("CEX-%s", cexName)) + cex, err = libxc.NewCEX(cexName, cexCfg.APIKey, cexCfg.APISecret, logger, dex.Simnet) + if err != nil { + return nil, fmt.Errorf("failed to create CEX: %v", err) + } + cm := dex.NewConnectionMaster(cex) + if err != nil { + return nil, fmt.Errorf("failed to connect to CEX: %v", err) + } + cexCMs[cexName] = cm + err = cm.Connect(m.ctx) + if err != nil { + return nil, fmt.Errorf("failed to connect to CEX: %v", err) + } + cexes[cexName] = cex + } + return cex, nil + } + for _, cfg := range enabledCfgs { switch { case cfg.MMCfg != nil: wg.Add(1) go func(cfg *BotConfig) { + defer wg.Done() logger := m.log.SubLogger(fmt.Sprintf("MarketMaker-%s-%d-%d", cfg.Host, cfg.BaseAsset, cfg.QuoteAsset)) mktID := dexMarketID(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset) var baseFiatRate, quoteFiatRate float64 @@ -798,16 +837,31 @@ func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) err quoteFiatRate = user.FiatRates[cfg.QuoteAsset] } RunBasicMarketMaker(m.ctx, cfg, m.wrappedCoreForBot(mktID), oracle, baseFiatRate, quoteFiatRate, logger) - wg.Done() + }(cfg) + case cfg.ArbCfg != nil: + wg.Add(1) + go func(cfg *BotConfig) { + defer wg.Done() + logger := m.log.SubLogger(fmt.Sprintf("Arbitrage-%s-%d-%d", cfg.Host, cfg.BaseAsset, cfg.QuoteAsset)) + cex, err := getConnectedCEX(cfg.ArbCfg.CEXName) + if err != nil { + logger.Errorf("failed to connect to CEX: %v", err) + return + } + RunSimpleArbBot(m.ctx, cfg, m.core, cex, logger) }(cfg) default: - m.log.Errorf("Only basic market making is supported at this time. Skipping %s-%d-%d", cfg.Host, cfg.BaseAsset, cfg.QuoteAsset) + m.log.Errorf("No bot config provided. Skipping %s-%d-%d", cfg.Host, cfg.BaseAsset, cfg.QuoteAsset) } } go func() { wg.Wait() - m.log.Infof("All bots have stopped running.") + for cexName, cm := range cexCMs { + m.log.Infof("Shutting down connection to %s", cexName) + cm.Wait() + m.log.Infof("Connection to %s shut down", cexName) + } m.running.Store(false) }() diff --git a/client/mm/mm_simple_arb.go b/client/mm/mm_simple_arb.go new file mode 100644 index 0000000000..e6e294e34a --- /dev/null +++ b/client/mm/mm_simple_arb.go @@ -0,0 +1,544 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package mm + +import ( + "bytes" + "context" + "fmt" + "sort" + "sync" + "sync/atomic" + + "decred.org/dcrdex/client/core" + "decred.org/dcrdex/client/mm/libxc" + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/calc" + "decred.org/dcrdex/dex/order" +) + +// SimpleArbConfig is the configuration for an arbitrage bot that only places +// orders when there is a profitable arbitrage opportunity. +type SimpleArbConfig struct { + // CEXName is the name of the cex that the bot will arbitrage. + CEXName string `json:"cexName"` + // ProfitTrigger is the minimum profit before a cross-exchange trade + // sequence is initiated. Range: 0 < ProfitTrigger << 1. For example, if + // the ProfitTrigger is 0.01 and a trade sequence would produce a 1% profit + // or better, a trade sequence will be initiated. + ProfitTrigger float64 `json:"profitTrigger"` + // MaxActiveArbs sets a limit on the number of active arbitrage sequences + // that can be open simultaneously. + MaxActiveArbs uint32 `json:"maxActiveArbs"` + // NumEpochsLeaveOpen is the number of epochs an arbitrage sequence will + // stay open if one or both of the orders were not filled. + NumEpochsLeaveOpen uint32 `json:"numEpochsLeaveOpen"` + // BaseOptions are the multi-order options for the base asset wallet. + BaseOptions map[string]string `json:"baseOptions"` + // QuoteOptions are the multi-order options for the quote asset wallet. + QuoteOptions map[string]string `json:"quoteOptions"` +} + +func (c *SimpleArbConfig) Validate() error { + if c.ProfitTrigger <= 0 || c.ProfitTrigger > 1 { + return fmt.Errorf("profit trigger must be 0 < t <= 1, but got %v", c.ProfitTrigger) + } + + if c.MaxActiveArbs == 0 { + return fmt.Errorf("must allow at least 1 active arb") + } + + if c.NumEpochsLeaveOpen < 2 { + return fmt.Errorf("arbs must be left open for at least 2 epochs") + } + + return nil +} + +// arbSequence represents an attempted arbitrage sequence. +type arbSequence struct { + dexOrder *core.Order + cexOrderID string + dexRate uint64 + cexRate uint64 + cexOrderFilled bool + dexOrderFilled bool + sellOnDEX bool + startEpoch uint64 +} + +type simpleArbMarketMaker struct { + ctx context.Context + host string + baseID uint32 + quoteID uint32 + cex libxc.CEX + // cexTradeUpdatesID is passed to the Trade function of the cex + // so that the cex knows to send update notifications for the + // trade back to this bot. + cexTradeUpdatesID int + core clientCore + log dex.Logger + cfg *SimpleArbConfig + mkt *core.Market + book dexOrderBook + rebalanceRunning atomic.Bool + + activeArbsMtx sync.RWMutex + activeArbs []*arbSequence +} + +// rebalance checks if there is an arbitrage opportunity between the dex and cex, +// and if so, executes trades to capitalize on it. +func (a *simpleArbMarketMaker) rebalance(newEpoch uint64) { + if !a.rebalanceRunning.CompareAndSwap(false, true) { + return + } + defer a.rebalanceRunning.Store(false) + + exists, sellOnDex, lotsToArb, dexRate, cexRate := a.arbExists() + if exists { + // Execution will not happen if it would cause a self-match. + a.executeArb(sellOnDex, lotsToArb, dexRate, cexRate, newEpoch) + } + + a.activeArbsMtx.Lock() + defer a.activeArbsMtx.Unlock() + + remainingArbs := make([]*arbSequence, 0, len(a.activeArbs)) + for _, arb := range a.activeArbs { + expired := newEpoch-arb.startEpoch > uint64(a.cfg.NumEpochsLeaveOpen) + oppositeDirectionArbFound := exists && sellOnDex != arb.sellOnDEX + + if expired || oppositeDirectionArbFound { + a.cancelArbSequence(arb) + } else { + remainingArbs = append(remainingArbs, arb) + } + } + + a.activeArbs = remainingArbs +} + +// arbExists checks if an arbitrage opportunity exists. +func (a *simpleArbMarketMaker) arbExists() (exists, sellOnDex bool, lotsToArb, dexRate, cexRate uint64) { + cexBaseBalance, err := a.cex.Balance(dex.BipIDSymbol(a.baseID)) + if err != nil { + a.log.Errorf("failed to get cex balance for %v: %v", dex.BipIDSymbol(a.baseID), err) + return false, false, 0, 0, 0 + } + + cexQuoteBalance, err := a.cex.Balance(dex.BipIDSymbol(a.quoteID)) + if err != nil { + a.log.Errorf("failed to get cex balance for %v: %v", dex.BipIDSymbol(a.quoteID), err) + return false, false, 0, 0, 0 + } + + sellOnDex = false + exists, lotsToArb, dexRate, cexRate = a.arbExistsOnSide(sellOnDex, cexBaseBalance.Available, cexQuoteBalance.Available) + if exists { + return + } + + sellOnDex = true + exists, lotsToArb, dexRate, cexRate = a.arbExistsOnSide(sellOnDex, cexBaseBalance.Available, cexQuoteBalance.Available) + return +} + +// arbExistsOnSide checks if an arbitrage opportunity exists either when +// buying or selling on the dex. +func (a *simpleArbMarketMaker) arbExistsOnSide(sellOnDEX bool, cexBaseBalance, cexQuoteBalance uint64) (exists bool, lotsToArb, dexRate, cexRate uint64) { + noArb := func() (bool, uint64, uint64, uint64) { + return false, 0, 0, 0 + } + + lotSize := a.mkt.LotSize + + // maxLots is the max amount of lots of the base asset that can be traded + // on the exchange where the base asset is being sold. + var maxLots uint64 + if sellOnDEX { + maxOrder, err := a.core.MaxSell(a.host, a.baseID, a.quoteID) + if err != nil { + a.log.Errorf("MaxSell error: %v", err) + return noArb() + } + maxLots = maxOrder.Swap.Lots + } else { + maxLots = cexBaseBalance / lotSize + } + if maxLots == 0 { + return noArb() + } + + for numLots := uint64(1); numLots <= maxLots; numLots++ { + dexAvg, dexExtrema, dexFilled, err := a.book.VWAP(numLots, a.mkt.LotSize, !sellOnDEX) + if err != nil { + a.log.Errorf("error calculating dex VWAP: %v", err) + return noArb() + } + if !dexFilled { + break + } + // If buying on dex, check that we have enough to buy at this rate. + if !sellOnDEX { + maxBuy, err := a.core.MaxBuy(a.host, a.baseID, a.quoteID, dexExtrema) + if err != nil { + a.log.Errorf("maxBuy error: %v") + return noArb() + } + if maxBuy.Swap.Lots < numLots { + break + } + } + + cexAvg, cexExtrema, cexFilled, err := a.cex.VWAP(dex.BipIDSymbol(a.baseID), dex.BipIDSymbol(a.quoteID), sellOnDEX, numLots*lotSize) + if err != nil { + a.log.Errorf("error calculating cex VWAP: %v", err) + return + } + if !cexFilled { + break + } + + // If buying on cex, make sure we have enough to buy at this rate + amountNeeded := calc.BaseToQuote(cexExtrema, numLots*lotSize) + if sellOnDEX && (amountNeeded > cexQuoteBalance) { + break + } + + var priceRatio float64 + if sellOnDEX { + priceRatio = float64(dexAvg) / float64(cexAvg) + } else { + priceRatio = float64(cexAvg) / float64(dexAvg) + } + + // Even if the average price ratio is > profit trigger, we still need + // check if the current lot is profitable. + var currLotProfitable bool + if sellOnDEX { + currLotProfitable = dexExtrema > cexExtrema + } else { + currLotProfitable = cexExtrema > dexExtrema + } + + if priceRatio > (1+a.cfg.ProfitTrigger) && currLotProfitable { + lotsToArb = numLots + dexRate = dexExtrema + cexRate = cexExtrema + } else { + break + } + } + + if lotsToArb > 0 { + a.log.Infof("arb opportunity - sellOnDex: %v, lotsToArb: %v, dexRate: %v, cexRate: %v", sellOnDEX, lotsToArb, dexRate, cexRate) + return true, lotsToArb, dexRate, cexRate + } + + return noArb() +} + +// executeArb will execute an arbitrage sequence by placing orders on the dex +// and cex. An entry will be added to the a.activeArbs slice if both orders +// are successfully placed. +func (a *simpleArbMarketMaker) executeArb(sellOnDex bool, lotsToArb, dexRate, cexRate, epoch uint64) { + a.log.Debugf("executing arb opportunity - sellOnDex: %v, lotsToArb: %v, dexRate: %v, cexRate: %v", sellOnDex, lotsToArb, dexRate, cexRate) + + a.activeArbsMtx.RLock() + numArbs := len(a.activeArbs) + a.activeArbsMtx.RUnlock() + if numArbs >= int(a.cfg.MaxActiveArbs) { + a.log.Infof("cannot execute arb because already at max arbs") + return + } + + if a.selfMatch(sellOnDex, dexRate) { + a.log.Infof("cannot execute arb opportunity due to self-match") + return + } + // also check self-match on CEX? + + // Hold the lock for this entire process because updates to the cex trade + // may come even before the Trade function has returned, and in order to + // be able to process them, the new arbSequence struct must already be in + // the activeArbs slice. + a.activeArbsMtx.Lock() + defer a.activeArbsMtx.Unlock() + + // Place cex order first. If placing dex order fails then can freely cancel cex order. + cexTradeID, err := a.cex.Trade(a.ctx, dex.BipIDSymbol(a.baseID), dex.BipIDSymbol(a.quoteID), !sellOnDex, cexRate, lotsToArb*a.mkt.LotSize, a.cexTradeUpdatesID) + if err != nil { + a.log.Errorf("error placing cex order: %v", err) + return + } + + var options map[string]string + if sellOnDex { + options = a.cfg.BaseOptions + } else { + options = a.cfg.QuoteOptions + } + + dexOrders, err := a.core.MultiTrade(nil, &core.MultiTradeForm{ + Host: a.host, + Sell: sellOnDex, + Base: a.baseID, + Quote: a.quoteID, + Placements: []*core.QtyRate{ + { + Qty: lotsToArb * a.mkt.LotSize, + Rate: dexRate, + }, + }, + Options: options, + }) + if err != nil || len(dexOrders) != 1 { + if err != nil { + a.log.Errorf("error placing dex order: %v", err) + } + if len(dexOrders) != 1 { + a.log.Errorf("expected 1 dex order, got %v", len(dexOrders)) + } + + err := a.cex.CancelTrade(a.ctx, dex.BipIDSymbol(a.baseID), dex.BipIDSymbol(a.quoteID), cexTradeID) + if err != nil { + a.log.Errorf("error canceling cex order: %v", err) + // TODO: keep retrying failed cancel + } + return + } + + a.activeArbs = append(a.activeArbs, &arbSequence{ + dexOrder: dexOrders[0], + dexRate: dexRate, + cexOrderID: cexTradeID, + cexRate: cexRate, + sellOnDEX: sellOnDex, + startEpoch: epoch, + }) +} + +func (a *simpleArbMarketMaker) sortedOrders() (buys, sells []*core.Order) { + buys, sells = make([]*core.Order, 0), make([]*core.Order, 0) + + a.activeArbsMtx.RLock() + for _, arb := range a.activeArbs { + if arb.sellOnDEX { + sells = append(sells, arb.dexOrder) + } else { + buys = append(buys, arb.dexOrder) + } + } + a.activeArbsMtx.RUnlock() + + sort.Slice(buys, func(i, j int) bool { return buys[i].Rate > buys[j].Rate }) + sort.Slice(sells, func(i, j int) bool { return sells[i].Rate < sells[j].Rate }) + + return buys, sells +} + +// selfMatch checks if a order could match with any other orders +// already placed on the dex. +func (a *simpleArbMarketMaker) selfMatch(sell bool, rate uint64) bool { + buys, sells := a.sortedOrders() + + if sell && len(buys) > 0 && buys[0].Rate >= rate { + return true + } + + if !sell && len(sells) > 0 && sells[0].Rate <= rate { + return true + } + + return false +} + +// cancelArbSequence will cancel both the dex and cex orders in an arb sequence +// if they have not yet been filled. +func (a *simpleArbMarketMaker) cancelArbSequence(arb *arbSequence) { + if !arb.cexOrderFilled { + err := a.cex.CancelTrade(a.ctx, dex.BipIDSymbol(a.baseID), dex.BipIDSymbol(a.quoteID), arb.cexOrderID) + if err != nil { + a.log.Errorf("failed to cancel cex trade ID %s: %v", arb.cexOrderID, err) + } + } + + if !arb.dexOrderFilled { + err := a.core.Cancel(arb.dexOrder.ID) + if err != nil { + a.log.Errorf("failed to cancel dex order ID %s: %v", arb.dexOrder.ID, err) + } + } + + // keep retrying if failed to cancel? +} + +// removeActiveArb removes the active arb at index i. +// +// activeArbsMtx MUST be held when calling this function. +func (a *simpleArbMarketMaker) removeActiveArb(i int) { + a.activeArbs[i] = a.activeArbs[len(a.activeArbs)-1] + a.activeArbs = a.activeArbs[:len(a.activeArbs)-1] +} + +// handleCEXTradeUpdate is called when the CEX sends a notification that the +// status of a trade has changed. +func (a *simpleArbMarketMaker) handleCEXTradeUpdate(update *libxc.TradeUpdate) { + if !update.Complete { + return + } + + a.activeArbsMtx.Lock() + defer a.activeArbsMtx.Unlock() + + for i, arb := range a.activeArbs { + if arb.cexOrderID == update.TradeID { + arb.cexOrderFilled = true + if arb.dexOrderFilled { + a.removeActiveArb(i) + } + return + } + } +} + +// handleDEXOrderUpdate is called when the DEX sends a notification that the +// status of an order has changed. +func (a *simpleArbMarketMaker) handleDEXOrderUpdate(o *core.Order) { + if o.Status <= order.OrderStatusBooked { + return + } + + a.activeArbsMtx.Lock() + defer a.activeArbsMtx.Unlock() + + for i, arb := range a.activeArbs { + if bytes.Equal(arb.dexOrder.ID, o.ID) { + arb.dexOrderFilled = true + if arb.cexOrderFilled { + a.removeActiveArb(i) + } + return + } + } +} + +func (m *simpleArbMarketMaker) handleNotification(note core.Notification) { + switch n := note.(type) { + case *core.OrderNote: + ord := n.Order + if ord == nil { + return + } + m.handleDEXOrderUpdate(ord) + } +} + +func (a *simpleArbMarketMaker) run() { + book, bookFeed, err := a.core.SyncBook(a.host, a.baseID, a.quoteID) + if err != nil { + a.log.Errorf("Failed to sync book: %v", err) + return + } + a.book = book + + err = a.cex.SubscribeMarket(a.ctx, dex.BipIDSymbol(a.baseID), dex.BipIDSymbol(a.quoteID)) + if err != nil { + a.log.Errorf("Failed to subscribe to cex market: %v", err) + return + } + + tradeUpdates, unsubscribe, tradeUpdatesID := a.cex.SubscribeTradeUpdates() + defer unsubscribe() + a.cexTradeUpdatesID = tradeUpdatesID + + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case n := <-bookFeed.Next(): + if n.Action == core.EpochMatchSummary { + payload := n.Payload.(*core.EpochMatchSummaryPayload) + a.rebalance(payload.Epoch + 1) + } + case <-a.ctx.Done(): + return + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case update := <-tradeUpdates: + a.handleCEXTradeUpdate(update) + case <-a.ctx.Done(): + return + } + } + }() + + noteFeed := a.core.NotificationFeed() + + wg.Add(1) + go func() { + defer wg.Done() + defer noteFeed.ReturnFeed() + for { + select { + case n := <-noteFeed.C: + a.handleNotification(n) + case <-a.ctx.Done(): + return + } + } + }() + + wg.Wait() + + a.cancelAllOrders() +} + +func (a *simpleArbMarketMaker) cancelAllOrders() { + a.activeArbsMtx.Lock() + defer a.activeArbsMtx.Unlock() + + for _, arb := range a.activeArbs { + a.cancelArbSequence(arb) + } +} + +func RunSimpleArbBot(ctx context.Context, cfg *BotConfig, c clientCore, cex libxc.CEX, log dex.Logger) { + if cfg.ArbCfg == nil { + // implies bug in caller + log.Errorf("No arb config provided. Exiting.") + return + } + + mkt, err := c.ExchangeMarket(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset) + if err != nil { + log.Errorf("Failed to get market: %v", err) + return + } + + (&simpleArbMarketMaker{ + ctx: ctx, + host: cfg.Host, + baseID: cfg.BaseAsset, + quoteID: cfg.QuoteAsset, + cex: cex, + core: c, + log: log, + cfg: cfg.ArbCfg, + mkt: mkt, + activeArbs: make([]*arbSequence, 0), + }).run() +} diff --git a/client/mm/mm_simple_arb_test.go b/client/mm/mm_simple_arb_test.go new file mode 100644 index 0000000000..5d21a4f583 --- /dev/null +++ b/client/mm/mm_simple_arb_test.go @@ -0,0 +1,1219 @@ +package mm + +import ( + "bytes" + "context" + "errors" + "fmt" + "sync" + "testing" + + "decred.org/dcrdex/client/asset" + "decred.org/dcrdex/client/core" + "decred.org/dcrdex/client/mm/libxc" + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/calc" + "decred.org/dcrdex/dex/encode" + "decred.org/dcrdex/dex/order" +) + +var log = dex.StdOutLogger("T", dex.LevelTrace) + +type vwapResult struct { + avg uint64 + extrema uint64 +} + +type dexOrder struct { + lots, rate uint64 + sell bool +} + +type cexOrder struct { + baseSymbol, quoteSymbol string + qty, rate uint64 + sell bool +} + +type tCEX struct { + bidsVWAP map[uint64]vwapResult + asksVWAP map[uint64]vwapResult + vwapErr error + balances map[string]*libxc.ExchangeBalance + balanceErr error + + tradeID string + tradeErr error + lastTrade *cexOrder + + cancelledTrades []string + cancelTradeErr error + + tradeUpdates chan *libxc.TradeUpdate + tradeUpdatesID int +} + +func newTCEX() *tCEX { + return &tCEX{ + bidsVWAP: make(map[uint64]vwapResult), + asksVWAP: make(map[uint64]vwapResult), + balances: make(map[string]*libxc.ExchangeBalance), + cancelledTrades: make([]string, 0), + tradeUpdates: make(chan *libxc.TradeUpdate), + } +} + +func (c *tCEX) Connect(ctx context.Context) (*sync.WaitGroup, error) { + return nil, nil +} +func (c *tCEX) Balances() (map[uint32]*libxc.ExchangeBalance, error) { + return nil, nil +} +func (c *tCEX) Markets() ([]*libxc.Market, error) { + return nil, nil +} +func (c *tCEX) Balance(symbol string) (*libxc.ExchangeBalance, error) { + return c.balances[symbol], c.balanceErr +} +func (c *tCEX) Trade(ctx context.Context, baseSymbol, quoteSymbol string, sell bool, rate, qty uint64, updaterID int) (string, error) { + if c.tradeErr != nil { + return "", c.tradeErr + } + c.lastTrade = &cexOrder{baseSymbol, quoteSymbol, qty, rate, sell} + return c.tradeID, nil +} +func (c *tCEX) CancelTrade(ctx context.Context, baseSymbol, quoteSymbol, tradeID string) error { + if c.cancelTradeErr != nil { + return c.cancelTradeErr + } + c.cancelledTrades = append(c.cancelledTrades, tradeID) + return nil +} +func (c *tCEX) SubscribeMarket(ctx context.Context, baseSymbol, quoteSymbol string) error { + return nil +} +func (c *tCEX) UnsubscribeMarket(baseSymbol, quoteSymbol string) { +} +func (c *tCEX) VWAP(baseSymbol, quoteSymbol string, sell bool, qty uint64) (vwap, extrema uint64, filled bool, err error) { + if c.vwapErr != nil { + return 0, 0, false, c.vwapErr + } + + if sell { + res, found := c.asksVWAP[qty] + if !found { + return 0, 0, false, nil + } + return res.avg, res.extrema, true, nil + } + + res, found := c.bidsVWAP[qty] + if !found { + return 0, 0, false, nil + } + return res.avg, res.extrema, true, nil +} +func (c *tCEX) SubscribeTradeUpdates() (<-chan *libxc.TradeUpdate, func(), int) { + return c.tradeUpdates, func() {}, c.tradeUpdatesID +} +func (c *tCEX) SubscribeCEXUpdates() (<-chan interface{}, func()) { + return nil, func() {} +} + +var _ libxc.CEX = (*tCEX)(nil) + +func TestArbRebalance(t *testing.T) { + mkt := &core.Market{ + LotSize: uint64(40 * 1e8), + } + + orderIDs := make([]order.OrderID, 5) + for i := 0; i < 5; i++ { + copy(orderIDs[i][:], encode.RandomBytes(32)) + } + + cexTradeIDs := make([]string, 0, 5) + for i := 0; i < 5; i++ { + cexTradeIDs = append(cexTradeIDs, fmt.Sprintf("%x", encode.RandomBytes(32))) + } + + log := dex.StdOutLogger("T", dex.LevelTrace) + + var currEpoch uint64 = 100 + var numEpochsLeaveOpen uint32 = 10 + var maxActiveArbs uint32 = 5 + var profitTrigger float64 = 0.01 + + type testBooks struct { + dexBidsAvg []uint64 + dexBidsExtrema []uint64 + + dexAsksAvg []uint64 + dexAsksExtrema []uint64 + + cexBidsAvg []uint64 + cexBidsExtrema []uint64 + + cexAsksAvg []uint64 + cexAsksExtrema []uint64 + } + + noArbBooks := &testBooks{ + dexBidsAvg: []uint64{1.8e6, 1.7e6}, + dexBidsExtrema: []uint64{1.7e6, 1.6e6}, + + dexAsksAvg: []uint64{2e6, 2.5e6}, + dexAsksExtrema: []uint64{2e6, 3e6}, + + cexBidsAvg: []uint64{1.9e6, 1.8e6}, + cexBidsExtrema: []uint64{1.85e6, 1.75e6}, + + cexAsksAvg: []uint64{2.1e6, 2.2e6}, + cexAsksExtrema: []uint64{2.2e6, 2.3e6}, + } + + arbBuyOnDEXBooks := &testBooks{ + dexBidsAvg: []uint64{1.8e6, 1.7e6}, + dexBidsExtrema: []uint64{1.7e6, 1.6e6}, + + dexAsksAvg: []uint64{2e6, 2.5e6}, + dexAsksExtrema: []uint64{2e6, 3e6}, + + cexBidsAvg: []uint64{2.3e6, 2.1e6}, + cexBidsExtrema: []uint64{2.2e6, 1.9e6}, + + cexAsksAvg: []uint64{2.4e6, 2.6e6}, + cexAsksExtrema: []uint64{2.5e6, 2.7e6}, + } + + arbSellOnDEXBooks := &testBooks{ + cexBidsAvg: []uint64{1.8e6, 1.7e6}, + cexBidsExtrema: []uint64{1.7e6, 1.6e6}, + + cexAsksAvg: []uint64{2e6, 2.5e6}, + cexAsksExtrema: []uint64{2e6, 3e6}, + + dexBidsAvg: []uint64{2.3e6, 2.1e6}, + dexBidsExtrema: []uint64{2.2e6, 1.9e6}, + + dexAsksAvg: []uint64{2.4e6, 2.6e6}, + dexAsksExtrema: []uint64{2.5e6, 2.7e6}, + } + + arb2LotsBuyOnDEXBooks := &testBooks{ + dexBidsAvg: []uint64{1.8e6, 1.7e6}, + dexBidsExtrema: []uint64{1.7e6, 1.6e6}, + + dexAsksAvg: []uint64{2e6, 2e6, 2.5e6}, + dexAsksExtrema: []uint64{2e6, 2e6, 3e6}, + + cexBidsAvg: []uint64{2.3e6, 2.2e6, 2.1e6}, + cexBidsExtrema: []uint64{2.2e6, 2.2e6, 1.9e6}, + + cexAsksAvg: []uint64{2.4e6, 2.6e6}, + cexAsksExtrema: []uint64{2.5e6, 2.7e6}, + } + + arb2LotsSellOnDEXBooks := &testBooks{ + cexBidsAvg: []uint64{1.8e6, 1.7e6}, + cexBidsExtrema: []uint64{1.7e6, 1.6e6}, + + cexAsksAvg: []uint64{2e6, 2e6, 2.5e6}, + cexAsksExtrema: []uint64{2e6, 2e6, 3e6}, + + dexBidsAvg: []uint64{2.3e6, 2.2e6, 2.1e6}, + dexBidsExtrema: []uint64{2.2e6, 2.2e6, 1.9e6}, + + dexAsksAvg: []uint64{2.4e6, 2.6e6}, + dexAsksExtrema: []uint64{2.5e6, 2.7e6}, + } + + // Arbing 2 lots worth would still be above profit trigger, but the + // second lot on its own would not be. + arb2LotsButOneWorth := &testBooks{ + dexBidsAvg: []uint64{1.8e6, 1.7e6}, + dexBidsExtrema: []uint64{1.7e6, 1.6e6}, + + dexAsksAvg: []uint64{2e6, 2.1e6}, + dexAsksExtrema: []uint64{2e6, 2.2e6}, + + cexBidsAvg: []uint64{2.3e6, 2.122e6}, + cexBidsExtrema: []uint64{2.2e6, 2.1e6}, + + cexAsksAvg: []uint64{2.4e6, 2.6e6}, + cexAsksExtrema: []uint64{2.5e6, 2.7e6}, + } + + type test struct { + name string + books *testBooks + dexMaxSell *core.MaxOrderEstimate + dexMaxBuy *core.MaxOrderEstimate + dexMaxSellErr error + dexMaxBuyErr error + cexBalances map[string]*libxc.ExchangeBalance + dexVWAPErr error + cexVWAPErr error + cexTradeErr error + existingArbs []*arbSequence + + expectedDexOrder *dexOrder + expectedCexOrder *cexOrder + expectedDEXCancels []dex.Bytes + expectedCEXCancels []string + //expectedActiveArbs []*arbSequence + } + + tests := []test{ + // "no arb" + { + name: "no arb", + books: noArbBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + }, + // "1 lot, buy on dex, sell on cex" + { + name: "1 lot, buy on dex, sell on cex", + books: arbBuyOnDEXBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + expectedDexOrder: &dexOrder{ + lots: 1, + rate: 2e6, + sell: false, + }, + expectedCexOrder: &cexOrder{ + baseSymbol: "dcr", + quoteSymbol: "btc", + qty: mkt.LotSize, + rate: 2.2e6, + sell: true, + }, + }, + // "1 lot, buy on dex, sell on cex, but dex base balance not enough" + { + name: "1 lot, buy on dex, sell on cex, but cex base balance not enough", + books: arbBuyOnDEXBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: mkt.LotSize / 2}, + }, + }, + // "2 lot, buy on dex, sell on cex, but dex quote balance only enough for 1" + { + name: "2 lot, buy on dex, sell on cex, but dex quote balance only enough for 1", + books: arb2LotsBuyOnDEXBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 1, + }, + }, + + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + + expectedDexOrder: &dexOrder{ + lots: 1, + rate: 2e6, + sell: false, + }, + expectedCexOrder: &cexOrder{ + baseSymbol: "dcr", + quoteSymbol: "btc", + qty: mkt.LotSize, + rate: 2.2e6, + sell: true, + }, + }, + // "1 lot, sell on dex, buy on cex" + { + name: "1 lot, sell on dex, buy on cex", + books: arbSellOnDEXBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + expectedDexOrder: &dexOrder{ + lots: 1, + rate: 2.2e6, + sell: true, + }, + expectedCexOrder: &cexOrder{ + baseSymbol: "dcr", + quoteSymbol: "btc", + qty: mkt.LotSize, + rate: 2e6, + sell: false, + }, + }, + // "2 lot, buy on cex, sell on dex, but cex quote balance only enough for 1" + { + name: "2 lot, buy on cex, sell on dex, but cex quote balance only enough for 1", + books: arb2LotsSellOnDEXBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: calc.BaseToQuote(2e6, mkt.LotSize*3/2)}, + "dcr": {Available: 1e19}, + }, + expectedDexOrder: &dexOrder{ + lots: 1, + rate: 2.2e6, + sell: true, + }, + expectedCexOrder: &cexOrder{ + baseSymbol: "dcr", + quoteSymbol: "btc", + qty: mkt.LotSize, + rate: 2e6, + sell: false, + }, + }, + // "1 lot, sell on dex, buy on cex" + { + name: "1 lot, sell on dex, buy on cex", + books: arbSellOnDEXBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + expectedDexOrder: &dexOrder{ + lots: 1, + rate: 2.2e6, + sell: true, + }, + expectedCexOrder: &cexOrder{ + baseSymbol: "dcr", + quoteSymbol: "btc", + qty: mkt.LotSize, + rate: 2e6, + sell: false, + }, + }, + // "2 lots arb still above profit trigger, but second not worth it on its own" + { + name: "2 lots arb still above profit trigger, but second not worth it on its own", + books: arb2LotsButOneWorth, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + expectedDexOrder: &dexOrder{ + lots: 1, + rate: 2e6, + sell: false, + }, + expectedCexOrder: &cexOrder{ + baseSymbol: "dcr", + quoteSymbol: "btc", + qty: mkt.LotSize, + rate: 2.2e6, + sell: true, + }, + }, + // "2 lot, buy on cex, sell on dex, but cex quote balance only enough for 1" + { + name: "2 lot, buy on cex, sell on dex, but cex quote balance only enough for 1", + books: arb2LotsSellOnDEXBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: calc.BaseToQuote(2e6, mkt.LotSize*3/2)}, + "dcr": {Available: 1e19}, + }, + expectedDexOrder: &dexOrder{ + lots: 1, + rate: 2.2e6, + sell: true, + }, + expectedCexOrder: &cexOrder{ + baseSymbol: "dcr", + quoteSymbol: "btc", + qty: mkt.LotSize, + rate: 2e6, + sell: false, + }, + }, + // "cex no asks" + { + name: "cex no asks", + books: &testBooks{ + dexBidsAvg: []uint64{1.8e6, 1.7e6}, + dexBidsExtrema: []uint64{1.7e6, 1.6e6}, + + dexAsksAvg: []uint64{2e6, 2.5e6}, + dexAsksExtrema: []uint64{2e6, 3e6}, + + cexBidsAvg: []uint64{1.9e6, 1.8e6}, + cexBidsExtrema: []uint64{1.85e6, 1.75e6}, + + cexAsksAvg: []uint64{}, + cexAsksExtrema: []uint64{}, + }, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + }, + // "dex no asks" + { + name: "dex no asks", + books: &testBooks{ + dexBidsAvg: []uint64{1.8e6, 1.7e6}, + dexBidsExtrema: []uint64{1.7e6, 1.6e6}, + + dexAsksAvg: []uint64{}, + dexAsksExtrema: []uint64{}, + + cexBidsAvg: []uint64{1.9e6, 1.8e6}, + cexBidsExtrema: []uint64{1.85e6, 1.75e6}, + + cexAsksAvg: []uint64{2.1e6, 2.2e6}, + cexAsksExtrema: []uint64{2.2e6, 2.3e6}, + }, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + }, + // "dex max sell error" + { + name: "dex max sell error", + books: arbSellOnDEXBooks, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + dexMaxSellErr: errors.New(""), + }, + // "dex max buy error" + { + name: "dex max buy error", + books: arbBuyOnDEXBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + dexMaxBuyErr: errors.New(""), + }, + // "dex vwap error" + { + name: "dex vwap error", + books: arbBuyOnDEXBooks, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + dexVWAPErr: errors.New(""), + }, + // "cex vwap error" + { + name: "cex vwap error", + books: arbBuyOnDEXBooks, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + cexVWAPErr: errors.New(""), + }, + // "self-match" + { + name: "self-match", + books: arbSellOnDEXBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + + existingArbs: []*arbSequence{{ + dexOrder: &core.Order{ + ID: orderIDs[0][:], + Rate: 2.2e6, + }, + cexOrderID: cexTradeIDs[0], + sellOnDEX: false, + startEpoch: currEpoch - 2, + }}, + + expectedCEXCancels: []string{cexTradeIDs[0]}, + expectedDEXCancels: []dex.Bytes{orderIDs[0][:]}, + }, + // "remove expired active arbs" + { + name: "remove expired active arbs", + books: noArbBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + existingArbs: []*arbSequence{ + { + dexOrder: &core.Order{ + ID: orderIDs[0][:], + }, + cexOrderID: cexTradeIDs[0], + sellOnDEX: false, + startEpoch: currEpoch - 2, + }, + { + dexOrder: &core.Order{ + ID: orderIDs[1][:], + }, + cexOrderID: cexTradeIDs[1], + sellOnDEX: false, + startEpoch: currEpoch - (uint64(numEpochsLeaveOpen) + 2), + }, + { + dexOrder: &core.Order{ + ID: orderIDs[2][:], + }, + cexOrderID: cexTradeIDs[2], + sellOnDEX: false, + cexOrderFilled: true, + startEpoch: currEpoch - (uint64(numEpochsLeaveOpen) + 2), + }, + { + dexOrder: &core.Order{ + ID: orderIDs[3][:], + }, + cexOrderID: cexTradeIDs[3], + sellOnDEX: false, + dexOrderFilled: true, + startEpoch: currEpoch - (uint64(numEpochsLeaveOpen) + 2), + }, + }, + expectedCEXCancels: []string{cexTradeIDs[1], cexTradeIDs[3]}, + expectedDEXCancels: []dex.Bytes{orderIDs[1][:], orderIDs[2][:]}, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + }, + // "already max active arbs" + { + name: "already max active arbs", + books: arbBuyOnDEXBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + existingArbs: []*arbSequence{ + { + dexOrder: &core.Order{ + ID: orderIDs[0][:], + }, + cexOrderID: cexTradeIDs[0], + sellOnDEX: false, + startEpoch: currEpoch - 1, + }, + { + dexOrder: &core.Order{ + ID: orderIDs[1][:], + }, + cexOrderID: cexTradeIDs[2], + sellOnDEX: false, + startEpoch: currEpoch - 2, + }, + { + dexOrder: &core.Order{ + ID: orderIDs[2][:], + }, + cexOrderID: cexTradeIDs[2], + sellOnDEX: false, + startEpoch: currEpoch - 3, + }, + { + dexOrder: &core.Order{ + ID: orderIDs[3][:], + }, + cexOrderID: cexTradeIDs[3], + sellOnDEX: false, + startEpoch: currEpoch - 4, + }, + { + dexOrder: &core.Order{ + ID: orderIDs[4][:], + }, + cexOrderID: cexTradeIDs[4], + sellOnDEX: false, + startEpoch: currEpoch - 5, + }, + }, + }, + // "cex trade error" + { + name: "cex trade error", + books: arbBuyOnDEXBooks, + dexMaxSell: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + dexMaxBuy: &core.MaxOrderEstimate{ + Swap: &asset.SwapEstimate{ + Lots: 5, + }, + }, + cexBalances: map[string]*libxc.ExchangeBalance{ + "btc": {Available: 1e19}, + "dcr": {Available: 1e19}, + }, + cexTradeErr: errors.New(""), + }, + } + + runTest := func(test *test) { + cex := newTCEX() + cex.vwapErr = test.cexVWAPErr + cex.balances = test.cexBalances + cex.tradeErr = test.cexTradeErr + + tCore := newTCore() + tCore.maxBuyEstimate = test.dexMaxBuy + tCore.maxSellEstimate = test.dexMaxSell + tCore.maxSellErr = test.dexMaxSellErr + tCore.maxBuyErr = test.dexMaxBuyErr + if test.expectedDexOrder != nil { + tCore.multiTradeResult = []*core.Order{ + { + ID: encode.RandomBytes(32), + }, + } + } + + orderBook := &tOrderBook{ + bidsVWAP: make(map[uint64]vwapResult), + asksVWAP: make(map[uint64]vwapResult), + vwapErr: test.dexVWAPErr, + } + for i := range test.books.dexBidsAvg { + orderBook.bidsVWAP[uint64(i+1)] = vwapResult{test.books.dexBidsAvg[i], test.books.dexBidsExtrema[i]} + } + for i := range test.books.dexAsksAvg { + orderBook.asksVWAP[uint64(i+1)] = vwapResult{test.books.dexAsksAvg[i], test.books.dexAsksExtrema[i]} + } + for i := range test.books.cexBidsAvg { + cex.bidsVWAP[uint64(i+1)*mkt.LotSize] = vwapResult{test.books.cexBidsAvg[i], test.books.cexBidsExtrema[i]} + } + for i := range test.books.cexAsksAvg { + cex.asksVWAP[uint64(i+1)*mkt.LotSize] = vwapResult{test.books.cexAsksAvg[i], test.books.cexAsksExtrema[i]} + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + arbEngine := &simpleArbMarketMaker{ + ctx: ctx, + log: log, + cex: cex, + mkt: mkt, + baseID: 42, + quoteID: 0, + core: tCore, + activeArbs: test.existingArbs, + cfg: &SimpleArbConfig{ + ProfitTrigger: profitTrigger, + MaxActiveArbs: maxActiveArbs, + NumEpochsLeaveOpen: numEpochsLeaveOpen, + }, + } + + go arbEngine.run() + + dummyNote := &core.BookUpdate{} + tCore.bookFeed.c <- dummyNote + tCore.bookFeed.c <- dummyNote + arbEngine.book = orderBook + tCore.bookFeed.c <- &core.BookUpdate{ + Action: core.EpochMatchSummary, + Payload: &core.EpochMatchSummaryPayload{ + Epoch: currEpoch - 1, + }, + } + tCore.bookFeed.c <- dummyNote + tCore.bookFeed.c <- dummyNote + + // Check dex trade + if test.expectedDexOrder == nil { + if len(tCore.buysPlaced) > 0 || len(tCore.sellsPlaced) > 0 { + t.Fatalf("%s: expected no dex order but got %d buys and %d sells", test.name, len(tCore.buysPlaced), len(tCore.sellsPlaced)) + } + } + if test.expectedDexOrder != nil { + if test.expectedDexOrder.sell { + if len(tCore.multiTradesPlaced[0].Placements) != 1 { + t.Fatalf("%s: expected 1 sell order but got %d", test.name, len(tCore.sellsPlaced)) + } + if !tCore.multiTradesPlaced[0].Sell { + t.Fatalf("%s: expected sell order but got buy order", test.name) + } + if test.expectedDexOrder.rate != tCore.multiTradesPlaced[0].Placements[0].Rate { + t.Fatalf("%s: expected sell order rate %d but got %d", test.name, test.expectedDexOrder.rate, tCore.sellsPlaced[0].Rate) + } + if test.expectedDexOrder.lots*mkt.LotSize != tCore.multiTradesPlaced[0].Placements[0].Qty { + t.Fatalf("%s: expected sell order qty %d but got %d", test.name, test.expectedDexOrder.lots*mkt.LotSize, tCore.sellsPlaced[0].Qty) + } + } + + if !test.expectedDexOrder.sell { + fmt.Printf("multi trades placed: %v\n", tCore.multiTradesPlaced) + if len(tCore.multiTradesPlaced[0].Placements) != 1 { + t.Fatalf("%s: expected 1 buy order but got %d", test.name, len(tCore.buysPlaced)) + } + if tCore.multiTradesPlaced[0].Sell { + t.Fatalf("%s: expected buy order but got sell order", test.name) + } + if test.expectedDexOrder.rate != tCore.multiTradesPlaced[0].Placements[0].Rate { + t.Fatalf("%s: expected buy order rate %d but got %d", test.name, test.expectedDexOrder.rate, tCore.buysPlaced[0].Rate) + } + if test.expectedDexOrder.lots*mkt.LotSize != tCore.multiTradesPlaced[0].Placements[0].Qty { + t.Fatalf("%s: expected buy order qty %d but got %d", test.name, test.expectedDexOrder.lots*mkt.LotSize, tCore.buysPlaced[0].Qty) + } + } + } + + // Check cex trade + if (test.expectedCexOrder == nil) != (cex.lastTrade == nil) { + t.Fatalf("%s: expected cex order %v but got %v", test.name, (test.expectedCexOrder != nil), (cex.lastTrade != nil)) + } + if cex.lastTrade != nil && + *cex.lastTrade != *test.expectedCexOrder { + t.Fatalf("%s: cex order %+v != expected %+v", test.name, cex.lastTrade, test.expectedCexOrder) + } + + // Check dex cancels + if len(test.expectedDEXCancels) != len(tCore.cancelsPlaced) { + t.Fatalf("%s: expected %d cancels but got %d", test.name, len(test.expectedDEXCancels), len(tCore.cancelsPlaced)) + } + for i := range test.expectedDEXCancels { + if !bytes.Equal(test.expectedDEXCancels[i], tCore.cancelsPlaced[i]) { + t.Fatalf("%s: expected cancel %x but got %x", test.name, test.expectedDEXCancels[i], tCore.cancelsPlaced[i]) + } + } + + // Check cex cancels + if len(test.expectedCEXCancels) != len(cex.cancelledTrades) { + t.Fatalf("%s: expected %d cex cancels but got %d", test.name, len(test.expectedCEXCancels), len(cex.cancelledTrades)) + } + for i := range test.expectedCEXCancels { + if test.expectedCEXCancels[i] != cex.cancelledTrades[i] { + t.Fatalf("%s: expected cex cancel %s but got %s", test.name, test.expectedCEXCancels[i], cex.cancelledTrades[i]) + } + } + } + + for _, test := range tests { + runTest(&test) + } +} + +func TestArbDexTradeUpdates(t *testing.T) { + orderIDs := make([]order.OrderID, 5) + for i := 0; i < 5; i++ { + copy(orderIDs[i][:], encode.RandomBytes(32)) + } + + cexTradeIDs := make([]string, 0, 5) + for i := 0; i < 5; i++ { + cexTradeIDs = append(cexTradeIDs, fmt.Sprintf("%x", encode.RandomBytes(32))) + } + + type test struct { + name string + activeArbs []*arbSequence + updatedOrderID []byte + updatedOrderStatus order.OrderStatus + expectedActiveArbs []*arbSequence + } + + dexOrder := &core.Order{ + ID: orderIDs[0][:], + } + + tests := []*test{ + { + name: "dex order still booked", + activeArbs: []*arbSequence{ + { + dexOrder: dexOrder, + cexOrderID: cexTradeIDs[0], + }, + }, + updatedOrderID: orderIDs[0][:], + updatedOrderStatus: order.OrderStatusBooked, + expectedActiveArbs: []*arbSequence{ + { + dexOrder: dexOrder, + cexOrderID: cexTradeIDs[0], + }, + }, + }, + { + name: "dex order executed, but cex not yet filled", + activeArbs: []*arbSequence{ + { + dexOrder: dexOrder, + cexOrderID: cexTradeIDs[0], + }, + }, + updatedOrderID: orderIDs[0][:], + updatedOrderStatus: order.OrderStatusExecuted, + expectedActiveArbs: []*arbSequence{ + { + dexOrder: dexOrder, + cexOrderID: cexTradeIDs[0], + dexOrderFilled: true, + }, + }, + }, + { + name: "dex order executed, but cex already filled", + activeArbs: []*arbSequence{ + { + dexOrder: dexOrder, + cexOrderID: cexTradeIDs[0], + cexOrderFilled: true, + }, + }, + updatedOrderID: orderIDs[0][:], + updatedOrderStatus: order.OrderStatusExecuted, + expectedActiveArbs: []*arbSequence{}, + }, + } + + runTest := func(test *test) { + cex := newTCEX() + tCore := newTCore() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + arbEngine := &simpleArbMarketMaker{ + ctx: ctx, + log: log, + cex: cex, + baseID: 42, + quoteID: 0, + core: tCore, + activeArbs: test.activeArbs, + cfg: &SimpleArbConfig{ + ProfitTrigger: 0.01, + MaxActiveArbs: 5, + NumEpochsLeaveOpen: 10, + }, + } + + go arbEngine.run() + + tCore.noteFeed <- &core.OrderNote{ + Order: &core.Order{ + Status: test.updatedOrderStatus, + ID: test.updatedOrderID, + }, + } + dummyNote := &core.BondRefundNote{} + tCore.noteFeed <- dummyNote + + if len(test.expectedActiveArbs) != len(arbEngine.activeArbs) { + t.Fatalf("%s: expected %d active arbs but got %d", test.name, len(test.expectedActiveArbs), len(arbEngine.activeArbs)) + } + + for i := range test.expectedActiveArbs { + if *arbEngine.activeArbs[i] != *test.expectedActiveArbs[i] { + t.Fatalf("%s: active arb %+v != expected active arb %+v", test.name, arbEngine.activeArbs[i], test.expectedActiveArbs[i]) + } + } + } + + for _, test := range tests { + runTest(test) + } +} + +func TestCexTradeUpdates(t *testing.T) { + orderIDs := make([]order.OrderID, 5) + for i := 0; i < 5; i++ { + copy(orderIDs[i][:], encode.RandomBytes(32)) + } + + cexTradeIDs := make([]string, 0, 5) + for i := 0; i < 5; i++ { + cexTradeIDs = append(cexTradeIDs, fmt.Sprintf("%x", encode.RandomBytes(32))) + } + + dexOrder := &core.Order{ + ID: orderIDs[0][:], + } + + type test struct { + name string + activeArbs []*arbSequence + updatedOrderID string + orderComplete bool + expectedActiveArbs []*arbSequence + } + + tests := []*test{ + { + name: "neither complete", + activeArbs: []*arbSequence{ + { + dexOrder: dexOrder, + cexOrderID: cexTradeIDs[0], + }, + }, + updatedOrderID: cexTradeIDs[0], + orderComplete: false, + expectedActiveArbs: []*arbSequence{ + { + dexOrder: dexOrder, + cexOrderID: cexTradeIDs[0], + }, + }, + }, + { + name: "cex complete, but dex order not complete", + activeArbs: []*arbSequence{ + { + dexOrder: dexOrder, + cexOrderID: cexTradeIDs[0], + }, + }, + updatedOrderID: cexTradeIDs[0], + orderComplete: true, + expectedActiveArbs: []*arbSequence{ + { + dexOrder: dexOrder, + cexOrderID: cexTradeIDs[0], + cexOrderFilled: true, + }, + }, + }, + { + name: "both complete", + activeArbs: []*arbSequence{ + { + dexOrder: dexOrder, + cexOrderID: cexTradeIDs[0], + dexOrderFilled: true, + }, + }, + updatedOrderID: cexTradeIDs[0], + orderComplete: true, + }, + } + + runTest := func(test *test) { + cex := newTCEX() + tCore := newTCore() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + arbEngine := &simpleArbMarketMaker{ + ctx: ctx, + log: log, + cex: cex, + baseID: 42, + quoteID: 0, + core: tCore, + activeArbs: test.activeArbs, + cfg: &SimpleArbConfig{ + ProfitTrigger: 0.01, + MaxActiveArbs: 5, + NumEpochsLeaveOpen: 10, + }, + } + + go arbEngine.run() + + cex.tradeUpdates <- &libxc.TradeUpdate{ + TradeID: test.updatedOrderID, + Complete: test.orderComplete, + } + // send dummy update + cex.tradeUpdates <- &libxc.TradeUpdate{ + TradeID: "", + } + + if len(test.expectedActiveArbs) != len(arbEngine.activeArbs) { + t.Fatalf("%s: expected %d active arbs but got %d", test.name, len(test.expectedActiveArbs), len(arbEngine.activeArbs)) + } + for i := range test.expectedActiveArbs { + if *arbEngine.activeArbs[i] != *test.expectedActiveArbs[i] { + t.Fatalf("%s: active arb %+v != expected active arb %+v", test.name, arbEngine.activeArbs[i], test.expectedActiveArbs[i]) + } + } + } + + for _, test := range tests { + runTest(test) + } +} diff --git a/client/mm/mm_test.go b/client/mm/mm_test.go index fbb2b2f117..3693e75d98 100644 --- a/client/mm/mm_test.go +++ b/client/mm/mm_test.go @@ -117,6 +117,14 @@ func (drv *tDriver) Info() *asset.WalletInfo { return drv.winfo } +type tBookFeed struct { + c chan *core.BookUpdate +} + +func (t *tBookFeed) Next() <-chan *core.BookUpdate { return t.c } +func (t *tBookFeed) Close() {} +func (t *tBookFeed) Candles(dur string) error { return nil } + type tCore struct { assetBalances map[uint32]*core.WalletBalance assetBalanceErr error @@ -143,6 +151,8 @@ type tCore struct { sellsPlaced []*core.TradeForm multiTradesPlaced []*core.MultiTradeForm maxFundingFees uint64 + book *orderbook.OrderBook + bookFeed *tBookFeed } func (c *tCore) NotificationFeed() *core.NoteFeed { @@ -152,16 +162,10 @@ func (c *tCore) ExchangeMarket(host string, base, quote uint32) (*core.Market, e return c.market, nil } -type tBookFeed struct{} - -func (t *tBookFeed) Next() <-chan *core.BookUpdate { return make(<-chan *core.BookUpdate) } -func (t *tBookFeed) Close() {} -func (t *tBookFeed) Candles(string) error { return nil } - var _ core.BookFeed = (*tBookFeed)(nil) -func (*tCore) SyncBook(host string, base, quote uint32) (*orderbook.OrderBook, core.BookFeed, error) { - return nil, &tBookFeed{}, nil +func (t *tCore) SyncBook(host string, base, quote uint32) (*orderbook.OrderBook, core.BookFeed, error) { + return t.book, t.bookFeed, nil } func (*tCore) SupportedAssets() map[uint32]*core.SupportedAsset { return nil @@ -175,7 +179,8 @@ func (c *tCore) SingleLotFees(form *core.SingleLotFeesForm) (uint64, uint64, uin } return c.buySwapFees, c.buyRedeemFees, c.buyRefundFees, nil } -func (*tCore) Cancel(oidB dex.Bytes) error { +func (t *tCore) Cancel(oidB dex.Bytes) error { + t.cancelsPlaced = append(t.cancelsPlaced, oidB) return nil } func (c *tCore) Trade(pw []byte, form *core.TradeForm) (*core.Order, error) { @@ -278,12 +283,41 @@ func newTCore() *tCore { cancelsPlaced: make([]dex.Bytes, 0), buysPlaced: make([]*core.TradeForm, 0), sellsPlaced: make([]*core.TradeForm, 0), + bookFeed: &tBookFeed{ + c: make(chan *core.BookUpdate, 1), + }, } } type tOrderBook struct { midGap uint64 midGapErr error + + bidsVWAP map[uint64]vwapResult + asksVWAP map[uint64]vwapResult + vwapErr error +} + +var _ dexOrderBook = (*tOrderBook)(nil) + +func (t *tOrderBook) VWAP(numLots, _ uint64, sell bool) (avg, extrema uint64, filled bool, err error) { + if t.vwapErr != nil { + return 0, 0, false, t.vwapErr + } + + if sell { + res, found := t.asksVWAP[numLots] + if !found { + return 0, 0, false, nil + } + return res.avg, res.extrema, true, nil + } + + res, found := t.bidsVWAP[numLots] + if !found { + return 0, 0, false, nil + } + return res.avg, res.extrema, true, nil } func (o *tOrderBook) MidGap() (uint64, error) { @@ -4092,7 +4126,7 @@ func testSegregatedCoreTrade(t *testing.T, testMultiTrade bool) { mm.doNotKillWhenBotsStop = true ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err = mm.Run(ctx, []*BotConfig{test.cfg}, []byte{}) + err = mm.Run(ctx, []*BotConfig{test.cfg}, []*CEXConfig{}, []byte{}) if err != nil { t.Fatalf("%s: unexpected error: %v", test.name, err) } diff --git a/client/mm/sample-config.json b/client/mm/sample-config.json index 407a022ea8..0ee747c611 100644 --- a/client/mm/sample-config.json +++ b/client/mm/sample-config.json @@ -1,37 +1,33 @@ -[{ - "host": "127.0.0.1:17273", - "baseAsset": 42, - "quoteAsset": 0, - "baseBalanceType": 0, - "baseBalance": 50, - "quoteBalanceType": 0, - "quoteBalance": 50, - "marketMakingConfig": { - "gapStrategy": "multiplier", - "sellPlacements": [{ - "gapFactor": 2, - "lots": 2 - },{ - "gapFactor": 3, - "lots": 1 - }], - "buyPlacements": [{ - "gapFactor": 2, - "lots": 2 - },{ - "gapFactor": 3, - "lots": 1 - }], - "driftTolerance": 0.001, - "oracleWeighting": 0.5, - "oracleBias": 0, - "emptyMarketRate": 0.005, - "baseOptions": { - "multisplit": "true" - }, - "quoteOptions": { - "multisplit": "true", - "multisplitbuffer": "5" +{ + "botCfgs": [ + { + "host": "127.0.0.1:17273", + "baseAsset": 60, + "quoteAsset": 0, + "baseBalanceType": 0, + "quoteBalanceType": 0, + "baseBalance": 100, + "quoteBalance": 100, + "arbConfig": { + "cexName": "Binance", + "profitTrigger": 0.01, + "maxActiveArbs": 5, + "numEpochsLeaveOpen": 5 + }, + "baseOptions": { + "multisplit": "true" + }, + "quoteOptions": { + "multisplit": "true", + "multisplitbuffer": "5" + } } - } -}] + ], + "cexCfgs": [ + { + "cexName": "Binance", + "apiKey": "", + "apiSecret": "" + } + ] +} diff --git a/client/orderbook/orderbook.go b/client/orderbook/orderbook.go index e43af5b177..edac0d66a6 100644 --- a/client/orderbook/orderbook.go +++ b/client/orderbook/orderbook.go @@ -476,26 +476,49 @@ func (ob *OrderBook) Unbook(note *msgjson.UnbookOrderNote) error { } // BestNOrders returns the best n orders from the provided side. -// NOTE: This is UNUSED, and test coverage is a near dup of bookside_test.go. -func (ob *OrderBook) BestNOrders(n int, side uint8) ([]*Order, bool, error) { +func (ob *OrderBook) BestNOrders(n int, sell bool) ([]*Order, bool, error) { if !ob.isSynced() { return nil, false, fmt.Errorf("order book is unsynced") } var orders []*Order var filled bool - switch side { - case msgjson.BuyOrderNum: + if sell { + orders, filled = ob.sells.BestNOrders(n) + } else { orders, filled = ob.buys.BestNOrders(n) + } - case msgjson.SellOrderNum: - orders, filled = ob.sells.BestNOrders(n) + return orders, filled, nil +} - default: - return nil, false, fmt.Errorf("unknown side provided: %d", side) +// VWAP calculates the volume weighted average price for the specified number +// of lots. +func (ob *OrderBook) VWAP(lots, lotSize uint64, sell bool) (avg, extrema uint64, filled bool, err error) { + orders, _, err := ob.BestNOrders(int(lots), sell) + if err != nil { + return 0, 0, false, err + } + + remainingLots := lots + var weightedSum uint64 + for _, order := range orders { + extrema = order.Rate + lotsInOrder := order.Quantity / lotSize + if lotsInOrder >= remainingLots { + weightedSum += remainingLots * extrema + filled = true + break + } + remainingLots -= lotsInOrder + weightedSum += lotsInOrder * extrema } - return orders, filled, nil + if !filled { + return 0, 0, false, nil + } + + return weightedSum / lots, extrema, true, nil } // Orders is the full order book, as slices of sorted buys and sells, and diff --git a/client/orderbook/orderbook_test.go b/client/orderbook/orderbook_test.go index 5fbe89aa13..ec67cda8c5 100644 --- a/client/orderbook/orderbook_test.go +++ b/client/orderbook/orderbook_test.go @@ -643,7 +643,7 @@ func TestOrderBookBestNOrders(t *testing.T) { label string orderBook *OrderBook n int - side uint8 + sell bool expected []*Order wantErr bool }{ @@ -662,7 +662,7 @@ func TestOrderBookBestNOrders(t *testing.T) { true, ), n: 3, - side: msgjson.BuyOrderNum, + sell: false, expected: []*Order{ makeOrder([32]byte{'e'}, msgjson.BuyOrderNum, 8, 4, 12), makeOrder([32]byte{'d'}, msgjson.BuyOrderNum, 5, 3, 10), @@ -685,7 +685,7 @@ func TestOrderBookBestNOrders(t *testing.T) { true, ), n: 3, - side: msgjson.SellOrderNum, + sell: true, expected: []*Order{}, wantErr: false, }, @@ -704,7 +704,7 @@ func TestOrderBookBestNOrders(t *testing.T) { true, ), n: 5, - side: msgjson.SellOrderNum, + sell: true, expected: []*Order{ makeOrder([32]byte{'b'}, msgjson.SellOrderNum, 10, 1, 2), makeOrder([32]byte{'c'}, msgjson.SellOrderNum, 10, 2, 5), @@ -728,7 +728,7 @@ func TestOrderBookBestNOrders(t *testing.T) { false, ), n: 5, - side: msgjson.SellOrderNum, + sell: true, expected: nil, wantErr: true, }, @@ -747,7 +747,7 @@ func TestOrderBookBestNOrders(t *testing.T) { true, ), n: 3, - side: msgjson.SellOrderNum, + sell: true, expected: []*Order{ makeOrder([32]byte{'b'}, msgjson.SellOrderNum, 10, 1, 2), makeOrder([32]byte{'c'}, msgjson.SellOrderNum, 10, 2, 5), @@ -758,7 +758,7 @@ func TestOrderBookBestNOrders(t *testing.T) { } for idx, tc := range tests { - best, _, err := tc.orderBook.BestNOrders(tc.n, tc.side) + best, _, err := tc.orderBook.BestNOrders(tc.n, tc.sell) if (err != nil) != tc.wantErr { t.Fatalf("[OrderBook.BestNOrders] #%d: error: %v, wantErr: %v", idx+1, err, tc.wantErr) @@ -917,6 +917,140 @@ func TestOrderBookBestFill(t *testing.T) { } } +func TestVWAP(t *testing.T) { + orders := []*Order{ + // buys + makeOrder([32]byte{'b'}, msgjson.BuyOrderNum, 10, 200, 2), + makeOrder([32]byte{'b'}, msgjson.BuyOrderNum, 20, 180, 2), + makeOrder([32]byte{'b'}, msgjson.BuyOrderNum, 10, 160, 2), + + // sells + makeOrder([32]byte{'b'}, msgjson.SellOrderNum, 10, 220, 2), + makeOrder([32]byte{'b'}, msgjson.SellOrderNum, 20, 240, 2), + makeOrder([32]byte{'b'}, msgjson.SellOrderNum, 10, 260, 2), + } + + ob := makeOrderBook(1, "ob", orders, make([]*cachedOrderNote, 0), true) + + type test struct { + sell bool + lots uint64 + lotSize uint64 + + expectedFilled bool + expectedAvg uint64 + expectedExtrema uint64 + } + + tests := []test{ + { + sell: true, + lots: 1, + lotSize: 10, + + expectedFilled: true, + expectedAvg: 220, + expectedExtrema: 220, + }, + { + sell: true, + lots: 2, + lotSize: 10, + + expectedFilled: true, + expectedAvg: 230, + expectedExtrema: 240, + }, + { + sell: true, + lots: 3, + lotSize: 10, + + expectedFilled: true, + expectedAvg: (220 + 240 + 240) / 3, + expectedExtrema: 240, + }, + { + sell: true, + lots: 4, + lotSize: 10, + + expectedFilled: true, + expectedAvg: (220 + 240 + 240 + 260) / 4, + expectedExtrema: 260, + }, + { + sell: true, + lots: 5, + lotSize: 10, + + expectedFilled: false, + }, + { + sell: false, + lots: 1, + lotSize: 10, + + expectedFilled: true, + expectedAvg: 200, + expectedExtrema: 200, + }, + { + sell: false, + lots: 2, + lotSize: 10, + + expectedFilled: true, + expectedAvg: 190, + expectedExtrema: 180, + }, + { + sell: false, + lots: 3, + lotSize: 10, + + expectedFilled: true, + expectedAvg: (200 + 180 + 180) / 3, + expectedExtrema: 180, + }, + { + sell: false, + lots: 4, + lotSize: 10, + + expectedFilled: true, + expectedAvg: (200 + 180 + 180 + 160) / 4, + expectedExtrema: 160, + }, + { + sell: false, + lots: 5, + lotSize: 10, + + expectedFilled: false, + }, + } + + for idx, test := range tests { + avg, extrema, filled, err := ob.VWAP(test.lots, test.lotSize, test.sell) + if err != nil { + t.Fatalf("[VWAP] #%d: unexpected error: %v", idx+1, err) + } + + if filled != test.expectedFilled { + t.Fatalf("[VWAP] #%d: expected filled to be %t, got %t", idx+1, test.expectedFilled, filled) + } + + if avg != test.expectedAvg { + t.Fatalf("[VWAP] #%d: expected average to be %d, got %d", idx+1, test.expectedAvg, avg) + } + + if extrema != test.expectedExtrema { + t.Fatalf("[VWAP] #%d: expected extrema to be %d, got %d", idx+1, test.expectedExtrema, extrema) + } + } +} + func TestValidateMatchProof(t *testing.T) { mid := "mkt" ob := NewOrderBook(tLogger) diff --git a/client/rpcserver/handlers.go b/client/rpcserver/handlers.go index c4a6145c4a..0be2eee85a 100644 --- a/client/rpcserver/handlers.go +++ b/client/rpcserver/handlers.go @@ -918,19 +918,24 @@ func handleNotifications(s *RPCServer, params *RawParams) *msgjson.ResponsePaylo // parseMarketMakingConfig takes a path to a json file, parses the contents, and // returns a []*mm.BotConfig. -func parseMarketMakingConfig(path string) ([]*mm.BotConfig, error) { +func parseMarketMakingConfig(path string) ([]*mm.BotConfig, []*mm.CEXConfig, error) { + type mmConfig struct { + BotCfgs []*mm.BotConfig `json:"botCfgs"` + CexCfgs []*mm.CEXConfig `json:"cexCfgs"` + } + contents, err := ioutil.ReadFile(path) if err != nil { - return nil, err + return nil, nil, err } - var configs []*mm.BotConfig - err = json.Unmarshal(contents, &configs) + cfg := mmConfig{} + err = json.Unmarshal(contents, &cfg) if err != nil { - return nil, err + return nil, nil, err } - return configs, nil + return cfg.BotCfgs, cfg.CexCfgs, nil } func handleStartMarketMaking(s *RPCServer, params *RawParams) *msgjson.ResponsePayload { @@ -939,14 +944,14 @@ func handleStartMarketMaking(s *RPCServer, params *RawParams) *msgjson.ResponseP return usage(startMarketMakingRoute, err) } - configs, err := parseMarketMakingConfig(form.cfgFilePath) + botConfigs, cexConfigs, err := parseMarketMakingConfig(form.cfgFilePath) if err != nil { errMsg := fmt.Sprintf("unable to parse market making config: %v", err) resErr := msgjson.NewError(msgjson.RPCStartMarketMakingError, errMsg) return createResponse(startMarketMakingRoute, nil, resErr) } - err = s.mm.Run(s.ctx, configs, form.appPass) + err = s.mm.Run(s.ctx, botConfigs, cexConfigs, form.appPass) if err != nil { errMsg := fmt.Sprintf("unable to start market making: %v", err) resErr := msgjson.NewError(msgjson.RPCStartMarketMakingError, errMsg) diff --git a/dex/bip-id.go b/dex/bip-id.go index de8cb2e83e..cbe60cd736 100644 --- a/dex/bip-id.go +++ b/dex/bip-id.go @@ -3,7 +3,9 @@ package dex -import "strings" +import ( + "strings" +) var symbolBipIDs map[string]uint32 @@ -636,3 +638,22 @@ var bipIDs = map[uint32]string{ 99999998: "fluid", 99999999: "qkc", } + +// TokenChains is a map of token symbol to a list of [2]uint32, where the first +// element is the token's BIP ID and the second element is the chain's BIP ID. +var TokenChains = make(map[string][][2]uint32) + +func init() { + for id, symbol := range bipIDs { + parts := strings.Split(symbol, ".") + if len(parts) < 2 { + continue + } + tokenSymbol, chainSymbol := parts[0], parts[1] + chainID, found := BipSymbolID(chainSymbol) + if !found { + panic("unknown chain symbol: " + chainSymbol) + } + TokenChains[tokenSymbol] = append(TokenChains[tokenSymbol], [2]uint32{id, chainID}) + } +} diff --git a/dex/calc/convert.go b/dex/calc/convert.go index 8ed9a3b0b5..f97f421e23 100644 --- a/dex/calc/convert.go +++ b/dex/calc/convert.go @@ -56,3 +56,16 @@ func ConventionalRate(msgRate uint64, baseInfo, quoteInfo dex.UnitInfo) float64 func ConventionalRateAlt(msgRate uint64, baseFactor, quoteFactor uint64) float64 { return float64(msgRate) / RateEncodingFactor * float64(baseFactor) / float64(quoteFactor) } + +// MessageRate converts an exchange rate in conventional encoding to one +// in message-rate encoding using the base and quote assets' UnitInfo. +func MessageRate(conventionalRate float64, baseInfo, quoteInfo dex.UnitInfo) uint64 { + return MessageRateAlt(conventionalRate, baseInfo.Conventional.ConversionFactor, quoteInfo.Conventional.ConversionFactor) +} + +// MessageRateAlt converts an exchange rate in conventional encoding to one +// in message-rate encoding using the base and quote assets' conventional +// conversion factors. +func MessageRateAlt(conventionalRate float64, baseFactor, quoteFactor uint64) uint64 { + return uint64(conventionalRate * RateEncodingFactor / float64(baseFactor) * float64(quoteFactor)) +}