From a335513ad993934ded176d191d16092983a912c0 Mon Sep 17 00:00:00 2001 From: nurdtechie98 Date: Fri, 10 Jul 2020 16:12:16 +0530 Subject: [PATCH] add delivery websocket endpoints --- node-binance-api.js | 752 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 749 insertions(+), 3 deletions(-) diff --git a/node-binance-api.js b/node-binance-api.js index fb99201e..d85018f5 100644 --- a/node-binance-api.js +++ b/node-binance-api.js @@ -29,6 +29,10 @@ let api = function Binance( options = {} ) { let fstreamSingle = 'wss://fstream.binance.com/ws/'; let fstreamSingleTest = 'wss://stream.binancefuture.com/ws/'; let fstreamTest = 'wss://stream.binancefuture.com/stream?streams='; + let dstream = 'wss://dstream.binance.com/stream?streams='; + let dstreamSingle = 'wss://dstream.binance.com/ws/'; + let dstreamSingleTest = 'wss://dstream.binancefuture.com/ws/'; + let dstreamTest = 'wss://dstream.binancefuture.com/stream?streams='; let stream = 'wss://stream.binance.com:9443/ws/'; let combineStream = 'wss://stream.binance.com:9443/stream?streams='; const userAgent = 'Mozilla/4.0 (compatible; Node Binance API)'; @@ -40,6 +44,12 @@ let api = function Binance( options = {} ) { Binance.futuresTicks = {}; Binance.futuresRealtime = {}; Binance.futuresKlineQueue = {}; + Binance.deliverySubscriptions = {}; + Binance.deliveryInfo = {}; + Binance.deliveryMeta = {}; + Binance.deliveryTicks = {}; + Binance.deliveryRealtime = {}; + Binance.deliveryKlineQueue = {}; Binance.depthCache = {}; Binance.depthCacheContext = {}; Binance.ohlcLatest = {}; @@ -97,6 +107,10 @@ let api = function Binance( options = {} ) { if ( typeof urls.fstreamSingle === 'string' ) fstreamSingle = urls.fstreamSingle; if ( typeof urls.fstreamTest === 'string' ) fstreamTest = urls.fstreamTest; if ( typeof urls.fstreamSingleTest === 'string' ) fstreamSingleTest = urls.fstreamSingleTest; + if ( typeof urls.dstream === 'string' ) dstream = urls.dstream; + if ( typeof urls.dstreamSingle === 'string' ) dstreamSingle = urls.dstreamSingle; + if ( typeof urls.dstreamTest === 'string' ) dstreamTest = urls.dstreamTest; + if ( typeof urls.dstreamSingleTest === 'string' ) dstreamSingleTest = urls.dstreamSingleTest; } if ( Binance.options.useServerTime ) { publicRequest( base + 'v3/time', {}, function ( error, response ) { @@ -1171,6 +1185,471 @@ let api = function Binance( options = {} ) { return friendlyData( data ); } + /** + * Delivery heartbeat code with a shared single interval tick + * @return {undefined} + */ + const deliverySocketHeartbeat = () => { + /* Sockets removed from subscriptions during a manual terminate() + will no longer be at risk of having functions called on them */ + for ( let endpointId in Binance.deliverySubscriptions ) { + const ws = Binance.deliverySubscriptions[endpointId]; + if ( ws.isAlive ) { + ws.isAlive = false; + if ( ws.readyState === WebSocket.OPEN ) ws.ping( noop ); + } else { + if ( Binance.options.verbose ) Binance.options.log( `Terminating zombie delivery WebSocket: ${ ws.endpoint }` ); + if ( ws.readyState === WebSocket.OPEN ) ws.terminate(); + } + } + }; + + /** + * Called when a delivery socket is opened, subscriptions are registered for later reference + * @param {function} openCallback - a callback function + * @return {undefined} + */ + const handleDeliverySocketOpen = function ( openCallback ) { + this.isAlive = true; + if ( Object.keys( Binance.deliverySubscriptions ).length === 0 ) { + Binance.socketHeartbeatInterval = setInterval( deliverySocketHeartbeat, 30000 ); + } + Binance.deliverySubscriptions[this.endpoint] = this; + if ( typeof openCallback === 'function' ) openCallback( this.endpoint ); + }; + + /** + * Called when delivery websocket is closed, subscriptions are de-registered for later reference + * @param {boolean} reconnect - true or false to reconnect the socket + * @param {string} code - code associated with the socket + * @param {string} reason - string with the response + * @return {undefined} + */ + const handleDeliverySocketClose = function ( reconnect, code, reason ) { + delete Binance.deliverySubscriptions[this.endpoint]; + if ( Binance.deliverySubscriptions && Object.keys( Binance.deliverySubscriptions ).length === 0 ) { + clearInterval( Binance.socketHeartbeatInterval ); + } + Binance.options.log( 'Delivery WebSocket closed: ' + this.endpoint + + ( code ? ' (' + code + ')' : '' ) + + ( reason ? ' ' + reason : '' ) ); + if ( Binance.options.reconnect && this.reconnect && reconnect ) { + if ( this.endpoint && parseInt( this.endpoint.length, 10 ) === 60 ) Binance.options.log( 'Delivery account data WebSocket reconnecting...' ); + else Binance.options.log( 'Delivery WebSocket reconnecting: ' + this.endpoint + '...' ); + try { + reconnect(); + } catch ( error ) { + Binance.options.log( 'Delivery WebSocket reconnect error: ' + error.message ); + } + } + }; + + /** + * Called when a delivery websocket errors + * @param {object} error - error object message + * @return {undefined} + */ + const handleDeliverySocketError = function ( error ) { + Binance.options.log( 'Delivery WebSocket error: ' + this.endpoint + + ( error.code ? ' (' + error.code + ')' : '' ) + + ( error.message ? ' ' + error.message : '' ) ); + }; + + /** + * Called on each delivery socket heartbeat + * @return {undefined} + */ + const handleDeliverySocketHeartbeat = function () { + this.isAlive = true; + }; + + /** + * Used to subscribe to a single delivery websocket endpoint + * @param {string} endpoint - endpoint to connect to + * @param {function} callback - the function to call when information is received + * @param {object} params - Optional reconnect {boolean} (whether to reconnect on disconnect), openCallback {function}, id {string} + * @return {WebSocket} - websocket reference + */ + const deliverySubscribeSingle = function ( endpoint, callback, params = {} ) { + if ( typeof params === 'boolean' ) params = { reconnect: params }; + if ( !params.reconnect ) params.reconnect = false; + if ( !params.openCallback ) params.openCallback = false; + if ( !params.id ) params.id = false; + let httpsproxy = process.env.https_proxy || false; + let socksproxy = process.env.socks_proxy || false; + let ws = false; + if ( socksproxy !== false ) { + socksproxy = proxyReplacewithIp( socksproxy ); + if ( Binance.options.verbose ) Binance.options.log( `deliverySubscribeSingle: using socks proxy server: ${ socksproxy }` ); + let agent = new SocksProxyAgent( { + protocol: parseProxy( socksproxy )[0], + host: parseProxy( socksproxy )[1], + port: parseProxy( socksproxy )[2] + } ); + ws = new WebSocket( ( Binance.options.test ? dstreamSingleTest : dstreamSingle ) + endpoint, { agent } ); + } else if ( httpsproxy !== false ) { + if ( Binance.options.verbose ) Binance.options.log( `deliverySubscribeSingle: using proxy server: ${ agent }` ); + let config = url.parse( httpsproxy ); + let agent = new HttpsProxyAgent( config ); + ws = new WebSocket( ( Binance.options.test ? dstreamSingleTest : dstreamSingle ) + endpoint, { agent } ); + } else { + ws = new WebSocket( ( Binance.options.test ? dstreamSingleTest : dstreamSingle ) + endpoint ); + } + + if ( Binance.options.verbose ) Binance.options.log( 'deliverySubscribeSingle: Subscribed to ' + endpoint ); + ws.reconnect = Binance.options.reconnect; + ws.endpoint = endpoint; + ws.isAlive = false; + ws.on( 'open', handleDeliverySocketOpen.bind( ws, params.openCallback ) ); + ws.on( 'pong', handleDeliverySocketHeartbeat ); + ws.on( 'error', handleDeliverySocketError ); + ws.on( 'close', handleDeliverySocketClose.bind( ws, params.reconnect ) ); + ws.on( 'message', data => { + try { + callback( JSON.parse( data ) ); + } catch ( error ) { + Binance.options.log( 'Parse error: ' + error.message ); + } + } ); + return ws; + }; + + /** + * Used to subscribe to a combined delivery websocket endpoint + * @param {string} streams - streams to connect to + * @param {function} callback - the function to call when information is received + * @param {object} params - Optional reconnect {boolean} (whether to reconnect on disconnect), openCallback {function}, id {string} + * @return {WebSocket} - websocket reference + */ + const deliverySubscribe = function ( streams, callback, params = {} ) { + if ( typeof streams === 'string' ) return deliverySubscribeSingle( streams, callback, params ); + if ( typeof params === 'boolean' ) params = { reconnect: params }; + if ( !params.reconnect ) params.reconnect = false; + if ( !params.openCallback ) params.openCallback = false; + if ( !params.id ) params.id = false; + let httpsproxy = process.env.https_proxy || false; + let socksproxy = process.env.socks_proxy || false; + const queryParams = streams.join( '/' ); + let ws = false; + if ( socksproxy !== false ) { + socksproxy = proxyReplacewithIp( socksproxy ); + if ( Binance.options.verbose ) Binance.options.log( `deliverySubscribe: using socks proxy server ${ socksproxy }` ); + let agent = new SocksProxyAgent( { + protocol: parseProxy( socksproxy )[0], + host: parseProxy( socksproxy )[1], + port: parseProxy( socksproxy )[2] + } ); + ws = new WebSocket( ( Binance.options.test ? dstreamTest : dstream ) + queryParams, { agent } ); + } else if ( httpsproxy !== false ) { + if ( Binance.options.verbose ) Binance.options.log( `deliverySubscribe: using proxy server ${ httpsproxy }` ); + let config = url.parse( httpsproxy ); + let agent = new HttpsProxyAgent( config ); + ws = new WebSocket( ( Binance.options.test ? dstreamTest : dstream ) + queryParams, { agent } ); + } else { + ws = new WebSocket( ( Binance.options.test ? dstreamTest : dstream ) + queryParams ); + } + + ws.reconnect = Binance.options.reconnect; + ws.endpoint = stringHash( queryParams ); + ws.isAlive = false; + if ( Binance.options.verbose ) { + Binance.options.log( `deliverySubscribe: Subscribed to [${ ws.endpoint }] ${ queryParams }` ); + } + ws.on( 'open', handleDeliverySocketOpen.bind( ws, params.openCallback ) ); + ws.on( 'pong', handleDeliverySocketHeartbeat ); + ws.on( 'error', handleDeliverySocketError ); + ws.on( 'close', handleDeliverySocketClose.bind( ws, params.reconnect ) ); + ws.on( 'message', data => { + try { + callback( JSON.parse( data ).data ); + } catch ( error ) { + Binance.options.log( `deliverySubscribe: Parse error: ${ error.message }` ); + } + } ); + return ws; + }; + + /** + * Used to terminate a delivery websocket + * @param {string} endpoint - endpoint identifier associated with the web socket + * @param {boolean} reconnect - auto reconnect after termination + * @return {undefined} + */ + const deliveryTerminate = function ( endpoint, reconnect = false ) { + let ws = Binance.deliverySubscriptions[endpoint]; + if ( !ws ) return; + ws.removeAllListeners( 'message' ); + ws.reconnect = reconnect; + ws.terminate(); + } + + /** + * Combines all delivery OHLC data with the latest update + * @param {string} symbol - the symbol + * @param {string} interval - time interval + * @return {array} - interval data for given symbol + */ + const deliveryKlineConcat = ( symbol, interval ) => { + let output = Binance.deliveryTicks[symbol][interval]; + if ( typeof Binance.deliveryRealtime[symbol][interval].time === 'undefined' ) return output; + const time = Binance.deliveryRealtime[symbol][interval].time; + const last_updated = Object.keys( Binance.deliveryTicks[symbol][interval] ).pop(); + if ( time >= last_updated ) { + output[time] = Binance.deliveryRealtime[symbol][interval]; + //delete output[time].time; + output[last_updated].isFinal = true; + output[time].isFinal = false; + } + return output; + }; + + /** + * Used for websocket delivery @kline + * @param {string} symbol - the symbol + * @param {object} kline - object with kline info + * @param {string} firstTime - time filter + * @return {undefined} + */ + const deliveryKlineHandler = ( symbol, kline, firstTime = 0 ) => { + // eslint-disable-next-line no-unused-vars + let { e: eventType, E: eventTime, k: ticks } = kline; + // eslint-disable-next-line no-unused-vars + let { o: open, h: high, l: low, c: close, v: volume, i: interval, x: isFinal, q: quoteVolume, V: takerBuyBaseVolume, Q: takerBuyQuoteVolume, n: trades, t: time, T:closeTime } = ticks; + if ( time <= firstTime ) return; + if ( !isFinal ) { + // if ( typeof Binance.futuresRealtime[symbol][interval].time !== 'undefined' ) { + // if ( Binance.futuresRealtime[symbol][interval].time > time ) return; + // } + Binance.deliveryRealtime[symbol][interval] = { time, closeTime, open, high, low, close, volume, quoteVolume, takerBuyBaseVolume, takerBuyQuoteVolume, trades, isFinal }; + return; + } + const first_updated = Object.keys( Binance.deliveryTicks[symbol][interval] ).shift(); + if ( first_updated ) delete Binance.deliveryTicks[symbol][interval][first_updated]; + Binance.deliveryTicks[symbol][interval][time] = { time, closeTime, open, high, low, close, volume, quoteVolume, takerBuyBaseVolume, takerBuyQuoteVolume, trades, isFinal:false }; + }; + + /** + * Converts the delivery liquidation stream data into a friendly object + * @param {object} data - liquidation data callback data type + * @return {object} - user friendly data type + */ + const dLiquidationConvertData = data => { + let eventType = data.e, eventTime = data.E; + let { + s: symbol, + S: side, + o: orderType, + f: timeInForce, + q: origAmount, + p: price, + ap: avgPrice, + X: orderStatus, + l: lastFilledQty, + z: totalFilledQty, + T: tradeTime + } = data.o; + return { symbol, side, orderType, timeInForce, origAmount, price, avgPrice, orderStatus, lastFilledQty, totalFilledQty, eventType, tradeTime, eventTime }; + }; + + /** + * Converts the delivery ticker stream data into a friendly object + * @param {object} data - user data callback data type + * @return {object} - user friendly data type + */ + const dTickerConvertData = data => { + let friendlyData = data => { + let { + e: eventType, + E: eventTime, + s: symbol, + p: priceChange, + P: percentChange, + w: averagePrice, + c: close, + Q: closeQty, + o: open, + h: high, + l: low, + v: volume, + q: quoteVolume, + O: openTime, + C: closeTime, + F: firstTradeId, + L: lastTradeId, + n: numTrades + } = data; + return { + eventType, + eventTime, + symbol, + priceChange, + percentChange, + averagePrice, + close, + closeQty, + open, + high, + low, + volume, + quoteVolume, + openTime, + closeTime, + firstTradeId, + lastTradeId, + numTrades + }; + } + if ( Array.isArray( data ) ) { + const result = []; + for ( let obj of data ) { + result.push( friendlyData( obj ) ); + } + return result; + } + return friendlyData( data ); + } + + /** + * Converts the delivery miniTicker stream data into a friendly object + * @param {object} data - user data callback data type + * @return {object} - user friendly data type + */ + const dMiniTickerConvertData = data => { + let friendlyData = data => { + let { + e: eventType, + E: eventTime, + s: symbol, + c: close, + o: open, + h: high, + l: low, + v: volume, + q: quoteVolume + } = data; + return { + eventType, + eventTime, + symbol, + close, + open, + high, + low, + volume, + quoteVolume + }; + } + if ( Array.isArray( data ) ) { + const result = []; + for ( let obj of data ) { + result.push( friendlyData( obj ) ); + } + return result; + } + return friendlyData( data ); + } + + /** + * Converts the delivery bookTicker stream data into a friendly object + * @param {object} data - user data callback data type + * @return {object} - user friendly data type + */ + const dBookTickerConvertData = data => { + let { + u: updateId, + s: symbol, + b: bestBid, + B: bestBidQty, + a: bestAsk, + A: bestAskQty + } = data; + return { + updateId, + symbol, + bestBid, + bestBidQty, + bestAsk, + bestAskQty + }; + } + + /** + * Converts the delivery markPrice stream data into a friendly object + * @param {object} data - user data callback data type + * @return {object} - user friendly data type + */ + const dMarkPriceConvertData = data => { + let friendlyData = data => { + let { + e: eventType, + E: eventTime, + s: symbol, + p: markPrice, + r: fundingRate, + T: fundingTime + } = data; + return { + eventType, + eventTime, + symbol, + markPrice, + fundingRate, + fundingTime + }; + } + if ( Array.isArray( data ) ) { + const result = []; + for ( let obj of data ) { + result.push( friendlyData( obj ) ); + } + return result; + } + return friendlyData( data ); + } + + /** + * Converts the delivery aggTrade stream data into a friendly object + * @param {object} data - user data callback data type + * @return {object} - user friendly data type + */ + const dAggTradeConvertData = data => { + let friendlyData = data => { + let { + e: eventType, + E: eventTime, + s: symbol, + a: aggTradeId, + p: price, + q: amount, + f: firstTradeId, + l: lastTradeId, + T: timestamp, + m: maker + } = data; + return { + eventType, + eventTime, + symbol, + aggTradeId, + price, + amount, + total: price * amount, + firstTradeId, + lastTradeId, + timestamp, + maker + }; + } + if ( Array.isArray( data ) ) { + const result = []; + for ( let obj of data ) { + result.push( friendlyData( obj ) ); + } + return result; + } + return friendlyData( data ); + } + /** * Used as part of the user data websockets callback * @param {object} data - user data callback data type @@ -1434,6 +1913,26 @@ let api = function Binance( options = {} ) { } }; + /** + * Used by delivery websockets chart cache + * @param {string} symbol - symbol to get candlestick info + * @param {string} interval - time interval, 1m, 3m, 5m .... + * @param {array} ticks - tick array + * @return {undefined} + */ + const deliveryKlineData = ( symbol, interval, ticks ) => { + let last_time = 0; + if ( isIterable( ticks ) ) { + for ( let tick of ticks ) { + // eslint-disable-next-line no-unused-vars + let [time, open, high, low, close, volume, closeTime, quoteVolume, trades, takerBuyBaseVolume, takerBuyQuoteVolume, ignored] = tick; + Binance.deliveryTicks[symbol][interval][time] = { time, closeTime, open, high, low, close, volume, quoteVolume, takerBuyBaseVolume, takerBuyQuoteVolume, trades }; + last_time = time; + } + Binance.deliveryMeta[symbol][interval].timestamp = last_time; + } + }; + /** * Used for /depth endpoint * @param {object} data - containing the bids and asks @@ -3160,20 +3659,20 @@ let api = function Binance( options = {} ) { return promiseRequest( 'v1/klines', params, {base:dapi} ); }, - deliveryContinuousKlines: async (pair, contractType = "CURRENT_QUARTER", interval = "30m", params = {}) => { + deliveryContinuousKlines: async ( pair, contractType = "CURRENT_QUARTER", interval = "30m", params = {} ) => { params.pair = pair; params.interval = interval; pairs.contractType = contractType; return promiseRequest( 'v1/continuousKlines', params, {base:dapi} ); }, - deliveryIndexKlines: async (pair, interval = "30m", params = {}) => { + deliveryIndexKlines: async ( pair, interval = "30m", params = {} ) => { params.pair = pair; params.interval = interval; return promiseRequest( 'v1/indexPriceKlines', params, {base:dapi} ); }, - deliveryMarkPriceKlines: async (symbol, interval = "30m", params = {} ) => { + deliveryMarkPriceKlines: async ( symbol, interval = "30m", params = {} ) => { params.symbol = symbol; params.interval = interval; return promiseRequest( 'v1/markPriceKlines', params, {base:dapi} ); @@ -3835,6 +4334,253 @@ let api = function Binance( options = {} ) { return subscription.endpoint; }, + // Delivery WebSocket Functions: + /** + * Subscribe to a single delivery websocket + * @param {string} url - the delivery websocket endpoint + * @param {function} callback - optional execution callback + * @param {object} params - Optional reconnect {boolean} (whether to reconnect on disconnect), openCallback {function}, id {string} + * @return {WebSocket} the websocket reference + */ + deliverySubscribeSingle: function ( url, callback, params = {} ) { + return deliverySubscribeSingle( url, callback, params ); + }, + + /** + * Subscribe to a combined delivery websocket + * @param {string} streams - the list of websocket endpoints to connect to + * @param {function} callback - optional execution callback + * @param {object} params - Optional reconnect {boolean} (whether to reconnect on disconnect), openCallback {function}, id {string} + * @return {WebSocket} the websocket reference + */ + deliverySubscribe: function ( streams, callback, params = {} ) { + return deliverySubscribe( streams, callback, params ); + }, + + /** + * Returns the known delivery websockets subscriptions + * @return {array} array of delivery websocket subscriptions + */ + deliverySubscriptions: function() { + return Binance.deliverySubscriptions; + }, + + /** + * Terminates a delivery websocket + * @param {string} endpoint - the string associated with the endpoint + * @return {undefined} + */ + deliveryTerminate: function ( endpoint ) { + if ( Binance.options.verbose ) Binance.options.log( 'Delivery WebSocket terminating:', endpoint ); + return deliveryTerminate( endpoint ); + }, + + /** + * Delivery WebSocket aggregated trades + * @param {array/string} symbols - an array or string of symbols to query + * @param {function} callback - callback function + * @return {string} the websocket endpoint + */ + deliveryAggTradeStream: function deliveryAggTradeStream( symbols, callback ) { + let reconnect = () => { + if ( Binance.options.reconnect ) deliveryAggTradeStream( symbols, callback ); + }; + let subscription, cleanCallback = data => callback( dAggTradeConvertData( data ) ); + if ( Array.isArray( symbols ) ) { + if ( !isArrayUnique( symbols ) ) throw Error( 'deliveryAggTradeStream: "symbols" cannot contain duplicate elements.' ); + let streams = symbols.map( symbol => symbol.toLowerCase() + '@aggTrade' ); + subscription = deliverySubscribe( streams, cleanCallback, { reconnect } ); + } else { + let symbol = symbols; + subscription = deliverySubscribeSingle( symbol.toLowerCase() + '@aggTrade', cleanCallback, { reconnect } ); + } + return subscription.endpoint; + }, + + /** + * Delivery WebSocket mark price + * @param {symbol} symbol name or false. can also be a callback + * @param {function} callback - callback function + * @param {string} speed - 1 second updates. leave blank for default 3 seconds + * @return {string} the websocket endpoint + */ + deliveryMarkPriceStream: function dMarkPriceStream( symbol = false, callback = console.log, speed = '@1s' ) { + if ( typeof symbol == 'function' ) { + callback = symbol; + symbol = false; + } + let reconnect = () => { + if ( Binance.options.reconnect ) dMarkPriceStream( symbol, callback ); + }; + const endpoint = symbol ? `${ symbol.toLowerCase() }@markPrice` : '!markPrice@arr' + let subscription = deliverySubscribeSingle( endpoint+speed, data => callback( dMarkPriceConvertData( data ) ), { reconnect } ); + return subscription.endpoint; + }, + + /** + * Delivery WebSocket liquidations stream + * @param {symbol} symbol name or false. can also be a callback + * @param {function} callback - callback function + * @return {string} the websocket endpoint + */ + deliveryLiquidationStream: function dLiquidationStream( symbol = false, callback = console.log ) { + if ( typeof symbol == 'function' ) { + callback = symbol; + symbol = false; + } + let reconnect = () => { + if ( Binance.options.reconnect ) dLiquidationStream( symbol, callback ); + }; + const endpoint = symbol ? `${ symbol.toLowerCase() }@forceOrder` : '!forceOrder@arr' + let subscription = deliverySubscribeSingle( endpoint, data => callback( dLiquidationConvertData( data ) ), { reconnect } ); + return subscription.endpoint; + }, + + /** + * Delivery WebSocket prevDay ticker + * @param {symbol} symbol name or false. can also be a callback + * @param {function} callback - callback function + * @return {string} the websocket endpoint + */ + deliveryTickerStream: function dTickerStream( symbol = false, callback = console.log ) { + if ( typeof symbol == 'function' ) { + callback = symbol; + symbol = false; + } + let reconnect = () => { + if ( Binance.options.reconnect ) dTickerStream( symbol, callback ); + }; + const endpoint = symbol ? `${ symbol.toLowerCase() }@ticker` : '!ticker@arr' + let subscription = deliverySubscribeSingle( endpoint, data => callback( dTickerConvertData( data ) ), { reconnect } ); + return subscription.endpoint; + }, + + /** + * Delivery WebSocket miniTicker + * @param {symbol} symbol name or false. can also be a callback + * @param {function} callback - callback function + * @return {string} the websocket endpoint + */ + deliveryMiniTickerStream: function dMiniTickerStream( symbol = false, callback = console.log ) { + if ( typeof symbol == 'function' ) { + callback = symbol; + symbol = false; + } + let reconnect = () => { + if ( Binance.options.reconnect ) dMiniTickerStream( symbol, callback ); + }; + const endpoint = symbol ? `${ symbol.toLowerCase() }@miniTicker` : '!miniTicker@arr' + let subscription = deliverySubscribeSingle( endpoint, data => callback( dMiniTickerConvertData( data ) ), { reconnect } ); + return subscription.endpoint; + }, + + /** + * Delivery WebSocket bookTicker + * @param {symbol} symbol name or false. can also be a callback + * @param {function} callback - callback function + * @return {string} the websocket endpoint + */ + deliveryBookTickerStream: function dBookTickerStream( symbol = false, callback = console.log ) { + if ( typeof symbol == 'function' ) { + callback = symbol; + symbol = false; + } + let reconnect = () => { + if ( Binance.options.reconnect ) dBookTickerStream( symbol, callback ); + }; + const endpoint = symbol ? `${ symbol.toLowerCase() }@bookTicker` : '!bookTicker' + let subscription = deliverySubscribeSingle( endpoint, data => callback( dBookTickerConvertData( data ) ), { reconnect } ); + return subscription.endpoint; + }, + + /** + * Websocket delivery klines + * @param {array/string} symbols - an array or string of symbols to query + * @param {string} interval - the time interval + * @param {function} callback - callback function + * @param {int} limit - maximum results, no more than 1000 + * @return {string} the websocket endpoint + */ + deliveryChart: async function deliveryChart( symbols, interval, callback, limit = 500 ) { + let reconnect = () => { + if ( Binance.options.reconnect ) deliveryChart( symbols, interval, callback, limit ); + }; + + let deliveryChartInit = symbol => { + if ( typeof Binance.deliveryMeta[symbol] === 'undefined' ) Binance.deliveryMeta[symbol] = {}; + if ( typeof Binance.deliveryMeta[symbol][interval] === 'undefined' ) Binance.deliveryMeta[symbol][interval] = {}; + if ( typeof Binance.deliveryTicks[symbol] === 'undefined' ) Binance.deliveryTicks[symbol] = {}; + if ( typeof Binance.deliveryTicks[symbol][interval] === 'undefined' ) Binance.deliveryTicks[symbol][interval] = {}; + if ( typeof Binance.deliveryRealtime[symbol] === 'undefined' ) Binance.deliveryRealtime[symbol] = {}; + if ( typeof Binance.deliveryRealtime[symbol][interval] === 'undefined' ) Binance.deliveryRealtime[symbol][interval] = {}; + if ( typeof Binance.deliveryKlineQueue[symbol] === 'undefined' ) Binance.deliveryKlineQueue[symbol] = {}; + if ( typeof Binance.deliveryKlineQueue[symbol][interval] === 'undefined' ) Binance.deliveryKlineQueue[symbol][interval] = []; + Binance.deliveryMeta[symbol][interval].timestamp = 0; + } + + let handleDeliveryKlineStream = kline => { + let symbol = kline.s, interval = kline.k.i; + if ( !Binance.deliveryMeta[symbol][interval].timestamp ) { + if ( typeof ( Binance.deliveryKlineQueue[symbol][interval] ) !== 'undefined' && kline !== null ) { + Binance.deliveryKlineQueue[symbol][interval].push( kline ); + } + } else { + //Binance.options.log('futures klines at ' + kline.k.t); + deliveryKlineHandler( symbol, kline ); + if ( callback ) callback( symbol, interval, deliveryKlineConcat( symbol, interval ) ); + } + }; + + let getDeliveryKlineSnapshot = async ( symbol, limit = 500 ) => { + let data = await promiseRequest( 'v1/klines', { symbol, interval, limit }, { base:fapi } ); + deliveryKlineData( symbol, interval, data ); + //Binance.options.log('/delivery klines at ' + Binance.deliveryMeta[symbol][interval].timestamp); + if ( typeof Binance.deliveryKlineQueue[symbol][interval] !== 'undefined' ) { + for ( let kline of Binance.deliveryKlineQueue[symbol][interval] ) deliveryKlineHandler( symbol, kline, Binance.deliveryMeta[symbol][interval].timestamp ); + delete Binance.deliveryKlineQueue[symbol][interval]; + } + if ( callback ) callback( symbol, interval, deliveryKlineConcat( symbol, interval ) ); + }; + + let subscription; + if ( Array.isArray( symbols ) ) { + if ( !isArrayUnique( symbols ) ) throw Error( 'deliveryChart: "symbols" array cannot contain duplicate elements.' ); + symbols.forEach( deliveryChartInit ); + let streams = symbols.map( symbol => `${ symbol.toLowerCase() }@kline_${ interval }` ); + subscription = deliverySubscribe( streams, handleDeliveryKlineStream, reconnect ); + symbols.forEach( element => getDeliveryKlineSnapshot( element, limit ) ); + } else { + let symbol = symbols; + deliveryChartInit( symbol ); + subscription = deliverySubscribeSingle( symbol.toLowerCase() + '@kline_' + interval, handleDeliveryKlineStream, reconnect ); + getDeliveryKlineSnapshot( symbol, limit ); + } + return subscription.endpoint; + }, + + /** + * Websocket delivery candlesticks + * @param {array/string} symbols - an array or string of symbols to query + * @param {string} interval - the time interval + * @param {function} callback - callback function + * @return {string} the websocket endpoint + */ + deliveryCandlesticks: function deliveryCandlesticks( symbols, interval, callback ) { + let reconnect = () => { + if ( Binance.options.reconnect ) deliveryCandlesticks( symbols, interval, callback ); + }; + let subscription; + if ( Array.isArray( symbols ) ) { + if ( !isArrayUnique( symbols ) ) throw Error( 'deliveryCandlesticks: "symbols" array cannot contain duplicate elements.' ); + let streams = symbols.map( symbol => symbol.toLowerCase() + '@kline_' + interval ); + subscription = deliverySubscribe( streams, callback, {reconnect} ); + } else { + let symbol = symbols.toLowerCase(); + subscription = deliverySubscribeSingle( symbol + '@kline_' + interval, callback, {reconnect} ); + } + return subscription.endpoint; + }, + websockets: { /** * Userdata websockets function