diff --git a/CHANGELOG b/CHANGELOG index 0d42cd77..33cea159 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,7 @@ +4.0.7 +- WSv2: refactor to use async/await style where possible +- WSv2: reconnect() now always resolves on completion + 4.0.6 - WSv2: fix internal flag persistence #521 diff --git a/lib/transports/ws2.js b/lib/transports/ws2.js index bac2d015..a2cc17a1 100644 --- a/lib/transports/ws2.js +++ b/lib/transports/ws2.js @@ -198,9 +198,9 @@ class WSv2 extends EventEmitter { * * @return {Promise} p */ - open () { + async open () { if (this._isOpen || this._ws !== null) { - return Promise.reject(new Error('already open')) + throw new Error('already open') } debug('connecting to %s...', this._url) @@ -219,14 +219,14 @@ class WSv2 extends EventEmitter { return new Promise((resolve, reject) => { this._ws.on('open', () => { - if (this._enabledFlags !== 0) { - this.sendEnabledFlags() - } - // call manually instead of binding to open event so it fires at the // right time this._onWSOpen() + if (this._enabledFlags !== 0) { + this.sendEnabledFlags() + } + debug('connected') resolve() }) @@ -239,16 +239,16 @@ class WSv2 extends EventEmitter { * * @param {number} code - passed to ws * @param {string} reason - passed to ws - * @return {Promise} + * @return {Promise} p */ - close (code, reason) { + async close (code, reason) { if (!this._isOpen || this._ws === null) { - return Promise.reject(new Error('not open')) + throw new Error('not open') } debug('disconnecting...') - return new Promise((resolve, reject) => { + return new Promise((resolve) => { this._ws.once('close', () => { this._isOpen = false this._ws = null @@ -275,10 +275,13 @@ class WSv2 extends EventEmitter { * @param {number?} dms - optional dead man switch flag, active 4 * @return {Promise} p */ - auth (calc, dms) { - if (!this._isOpen) return Promise.reject(new Error('not open')) + async auth (calc, dms) { + if (!this._isOpen) { + throw new Error('not open') + } + if (this._isAuthenticated) { - return Promise.reject(new Error('already authenticated')) + throw new Error('already authenticated') } const authNonce = nonce() @@ -289,7 +292,7 @@ class WSv2 extends EventEmitter { if (_isFinite(calc)) authArgs.calc = calc if (_isFinite(dms)) authArgs.dms = dms - return new Promise((resolve, reject) => { + return new Promise((resolve) => { this.once('auth', () => { debug('authenticated') resolve() @@ -317,6 +320,10 @@ class WSv2 extends EventEmitter { if (this._ws !== null && this._isOpen) { // did we get a watchdog timeout and need to close the connection? await this.close() + + return new Promise((resolve) => { + this.once(this._wasEverAuthenticated ? 'auth' : 'open', resolve) + }) } else { await this.reconnectAfterClose() // we are already closed, so reopen and re-auth } @@ -393,11 +400,13 @@ class WSv2 extends EventEmitter { /** * Trigger the packet watch-dog; called when we haven't seen a new WS packet * for longer than our WD duration (if provided) + * + * @return {Promise} p * @private */ - _triggerPacketWD () { + async _triggerPacketWD () { if (!this._packetWDDelay || !this._isOpen) { - return Promise.resolve() + return } debug( @@ -429,6 +438,9 @@ class WSv2 extends EventEmitter { }, this._packetWDDelay) } + /** + * Subscribes to previously subscribed channels, used after reconnecting + */ resubscribePreviousChannels () { Object.values(this._prevChannelMap).forEach((chan) => { const { channel } = chan @@ -458,7 +470,9 @@ class WSv2 extends EventEmitter { break } - default: {} + default: { + debug('unknown previously subscribed channel type: %s', channel) + } } }) } @@ -486,7 +500,7 @@ class WSv2 extends EventEmitter { /** * @private */ - _onWSClose () { + async _onWSClose () { this._isOpen = false this._isAuthenticated = false this._lastAuthSeq = -1 @@ -504,17 +518,15 @@ class WSv2 extends EventEmitter { if (this._isReconnecting || (this._autoReconnect && !this._isClosing)) { this._prevChannelMap = this._channelMap - setTimeout(() => { - if (this._reconnectThrottler) { - this._reconnectThrottler - .add(this.reconnectAfterClose.bind(this)) - .catch((err) => { - debug('error reconnectAfterClose: %s', err.stack) - }) - } else { - this.reconnectAfterClose().catch((err) => { - debug('error reconnectAfterClose: %s', err.stack) - }) + setTimeout(async () => { + try { + if (this._reconnectThrottler) { + await this._reconnectThrottler.add(this.reconnectAfterClose.bind(this)) + } else { + await this.reconnectAfterClose() + } + } catch (err) { + debug('error reconnectAfterClose: %s', err.stack) } }, this._reconnectDelay) } @@ -1371,7 +1383,7 @@ class WSv2 extends EventEmitter { * @param {boolean} args.audit - if true, an error is emitted on invalid seq * @return {Promise} p */ - enableSequencing (args = { audit: true }) { + async enableSequencing (args = { audit: true }) { this._seqAudit = args.audit === true return this.enableFlag(FLAGS.SEQ_ALL) @@ -1588,9 +1600,9 @@ class WSv2 extends EventEmitter { * @param {Object|Array} order * @return {Promise} p - resolves on submit notification */ - submitOrder (order) { + async submitOrder (order) { if (!this._isAuthenticated) { - return Promise.reject(new Error('not authenticated')) + throw new Error('not authenticated') } const packet = Array.isArray(order) @@ -1620,13 +1632,13 @@ class WSv2 extends EventEmitter { * @param {Object} changes - requires at least an 'id' * @return {Promise} p - resolves on receival of confirmation notification */ - updateOrder (changes = {}) { + async updateOrder (changes = {}) { const { id } = changes if (!this._isAuthenticated) { - return Promise.reject(new Error('not authenticated')) + throw new Error('not authenticated') } else if (!id) { - return Promise.reject(new Error('order ID required for update')) + throw new Error('order ID required for update') } this._sendOrderPacket([0, 'ou', null, changes]) @@ -1642,9 +1654,9 @@ class WSv2 extends EventEmitter { * @param {Object|Array|number} order * @return {Promise} p */ - cancelOrder (order) { + async cancelOrder (order) { if (!this._isAuthenticated) { - return Promise.reject(new Error('not authenticated')) + throw new Error('not authenticated') } const id = typeof order === 'number' @@ -1668,14 +1680,12 @@ class WSv2 extends EventEmitter { * @param {Object[]|Array[]|number[]} orders * @return {Promise} p */ - cancelOrders (orders) { + async cancelOrders (orders) { if (!this._isAuthenticated) { - return Promise.reject(new Error('not authenticated')) + throw new Error('not authenticated') } - return Promise.all(orders.map((order) => { - return this.cancelOrder(order) - })) + return Promise.all(orders.map(this.cancelOrder)) } /** @@ -1686,14 +1696,13 @@ class WSv2 extends EventEmitter { * @param {Object[]} opPayloads * @return {Promise} p - rejects if not authenticated */ - submitOrderMultiOp (opPayloads) { + async submitOrderMultiOp (opPayloads) { if (!this._isAuthenticated) { - return Promise.reject(new Error('not authenticated')) + throw new Error('not authenticated') } + // TODO: multi-op tracking this.send([0, 'ox_multi', null, opPayloads]) - - return Promise.resolve() // TODO: multi-op tracking } /** diff --git a/package.json b/package.json index df658dce..aa89d439 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bitfinex-api-node", - "version": "4.0.6", + "version": "4.0.7", "description": "Node reference library for Bitfinex API", "engines": { "node": ">=7" diff --git a/test/lib/transports/ws2-integration.js b/test/lib/transports/ws2-integration.js index 168fb4ce..e67dc1bc 100644 --- a/test/lib/transports/ws2-integration.js +++ b/test/lib/transports/ws2-integration.js @@ -2,6 +2,7 @@ 'use strict' const assert = require('assert') +const Promise = require('bluebird') const WSv2 = require('../../../lib/transports/ws2') const { Order } = require('bfx-api-node-models') const { MockWSv2Server } = require('bfx-api-mock-srv') @@ -9,6 +10,10 @@ const { MockWSv2Server } = require('bfx-api-mock-srv') const API_KEY = 'dummy' const API_SECRET = 'dummy' +const delay = async (ms) => { + await new Promise(resolve => setTimeout(resolve, ms)) +} + const createTestWSv2Instance = (params = {}) => { return new WSv2({ apiKey: API_KEY, @@ -20,85 +25,83 @@ const createTestWSv2Instance = (params = {}) => { } describe('WSv2 orders', () => { - it('creates & confirms orders', (done) => { + it('creates & confirms orders', async () => { const wss = new MockWSv2Server({ listen: true }) const ws = createTestWSv2Instance() - ws.open() - ws.on('open', ws.auth.bind(ws)) - ws.once('auth', () => { - const o = new Order({ - gid: null, - cid: 0, - type: 'EXCHANGE LIMIT', - price: 100, - amount: 1, - symbol: 'tBTCUSD' - }) - ws.submitOrder(o).then(() => { - wss.close() - done() - }).catch(done) + await ws.open() + await ws.auth() + + const o = new Order({ + gid: null, + cid: 0, + type: 'EXCHANGE LIMIT', + price: 100, + amount: 1, + symbol: 'tBTCUSD' + }) + + return ws.submitOrder(o).then(() => { + wss.close() }) }) - it('keeps orders up to date', (done) => { + it('keeps orders up to date', async () => { const wss = new MockWSv2Server({ listen: true }) const ws = createTestWSv2Instance() - ws.on('open', ws.auth.bind(ws)) - - ws.once('auth', () => { - const o = new Order({ - gid: null, - cid: 0, - type: 'EXCHANGE LIMIT', - price: 100, - amount: 1, - symbol: 'tBTCUSD' - }, ws) - - o.registerListeners() - - o.submit().then(() => { - const arr = o.serialize() - arr[16] = 256 - - wss.send([0, 'ou', arr]) - - setTimeout(() => { - assert.strictEqual(o.price, 256) - arr[16] = 150 - - wss.send([0, 'oc', arr]) - - setTimeout(() => { - assert.strictEqual(o.price, 150) - o.removeListeners() - wss.close() - done() - }, 100) - }, 100) - }).catch(done) - }) - ws.open() + await ws.open() + await ws.auth() + + const o = new Order({ + gid: null, + cid: 0, + type: 'EXCHANGE LIMIT', + price: 100, + amount: 1, + symbol: 'tBTCUSD' + }, ws) + + o.registerListeners() + + await o.submit() + + const arr = o.serialize() + arr[16] = 256 + + wss.send([0, 'ou', arr]) + + await delay(100) + + assert.strictEqual(o.price, 256) + arr[16] = 150 + + wss.send([0, 'oc', arr]) + + await delay(100) + + assert.strictEqual(o.price, 150) + o.removeListeners() + wss.close() }) - it('updateOrder: sends order changeset packet through', (done) => { + it('updateOrder: sends order changeset packet through', async () => { const wss = new MockWSv2Server() - const wsSingle = createTestWSv2Instance() - wsSingle.open() - wsSingle.on('open', wsSingle.auth.bind(wsSingle)) - wsSingle.once('auth', () => { - const o = new Order({ - id: Date.now(), - type: 'EXCHANGE LIMIT', - price: 100, - amount: 1, - symbol: 'tBTCUSD' - }, wsSingle) - - wsSingle._ws.send = (msgJSON) => { + const ws = createTestWSv2Instance() + + await ws.open() + await ws.auth() + + const o = new Order({ + id: Date.now(), + type: 'EXCHANGE LIMIT', + price: 100, + amount: 1, + symbol: 'tBTCUSD' + }, ws) + + return new Promise((resolve) => { + ws._ws.send = (msgJSON) => { const msg = JSON.parse(msgJSON) assert.strictEqual(msg[0], 0) @@ -109,83 +112,87 @@ describe('WSv2 orders', () => { assert.strictEqual(+msg[3].price, 200) wss.close() - done() + resolve() } o.update({ price: 200, delta: 1 }) }) }) - it('sends individual order packets when not buffering', (done) => { + it('sends individual order packets when not buffering', async () => { const wss = new MockWSv2Server() - const wsSingle = createTestWSv2Instance() - wsSingle.open() - wsSingle.on('open', wsSingle.auth.bind(wsSingle)) - wsSingle.once('auth', () => { - const oA = new Order({ - gid: null, - cid: Date.now(), - type: 'EXCHANGE LIMIT', - price: 100, - amount: 1, - symbol: 'tBTCUSD' - }) + const ws = createTestWSv2Instance() - const oB = new Order({ - gid: null, - cid: Date.now(), - type: 'EXCHANGE LIMIT', - price: 10, - amount: 1, - symbol: 'tETHUSD' - }) + await ws.open() + await ws.auth() - let sendN = 0 + const oA = new Order({ + gid: null, + cid: Date.now(), + type: 'EXCHANGE LIMIT', + price: 100, + amount: 1, + symbol: 'tBTCUSD' + }) - wsSingle._ws.send = (msgJSON) => { + const oB = new Order({ + gid: null, + cid: Date.now(), + type: 'EXCHANGE LIMIT', + price: 10, + amount: 1, + symbol: 'tETHUSD' + }) + + let sendN = 0 + + return new Promise(async (resolve) => { + ws._ws.send = (msgJSON) => { const msg = JSON.parse(msgJSON) assert.strictEqual(msg[1], 'on') sendN++ if (sendN === 2) { wss.close() - done() + resolve() } } - wsSingle.submitOrder(oA) - wsSingle.submitOrder(oB) + // note promises ignored + ws.submitOrder(oA) + ws.submitOrder(oB) }) }) - it('buffers order packets', (done) => { + it('buffers order packets', async () => { const wss = new MockWSv2Server() - const wsMulti = createTestWSv2Instance({ + const ws = createTestWSv2Instance({ orderOpBufferDelay: 100 }) - wsMulti.open() - wsMulti.on('open', wsMulti.auth.bind(wsMulti)) - wsMulti.once('auth', () => { - const oA = new Order({ - gid: null, - cid: Date.now(), - type: 'EXCHANGE LIMIT', - price: 100, - amount: 1, - symbol: 'tBTCUSD' - }) + await ws.open() + await ws.auth() - const oB = new Order({ - gid: null, - cid: Date.now(), - type: 'EXCHANGE LIMIT', - price: 10, - amount: 1, - symbol: 'tETHUSD' - }) + const oA = new Order({ + gid: null, + cid: Date.now(), + type: 'EXCHANGE LIMIT', + price: 100, + amount: 1, + symbol: 'tBTCUSD' + }) + + const oB = new Order({ + gid: null, + cid: Date.now(), + type: 'EXCHANGE LIMIT', + price: 10, + amount: 1, + symbol: 'tETHUSD' + }) - wsMulti._ws.send = (msgJSON) => { + return new Promise(async (resolve) => { + ws._ws.send = (msgJSON) => { const msg = JSON.parse(msgJSON) assert.strictEqual(msg[1], 'ox_multi') @@ -194,11 +201,12 @@ describe('WSv2 orders', () => { }) wss.close() - done() + resolve() } - wsMulti.submitOrder(oA) - wsMulti.submitOrder(oB) + // note promises ignored + ws.submitOrder(oA) + ws.submitOrder(oB) }) }) }) @@ -221,12 +229,14 @@ describe('WSv2 listeners', () => { assert.strictEqual(updatesSeen, 2) }) - it('tracks channel refs to auto sub/unsub', (done) => { + it('tracks channel refs to auto sub/unsub', async () => { const ws = createTestWSv2Instance() const wss = new MockWSv2Server() let subs = 0 let unsubs = 0 + await ws.open() + wss.on('message', (ws, msg) => { if (msg.event === 'subscribe' && msg.channel === 'trades') { subs++ @@ -245,11 +255,9 @@ describe('WSv2 listeners', () => { } }) - ws.on('open', () => { - ws.subscribeTrades('tBTCUSD') - ws.subscribeTrades('tBTCUSD') - ws.subscribeTrades('tBTCUSD') - }) + ws.subscribeTrades('tBTCUSD') + ws.subscribeTrades('tBTCUSD') + ws.subscribeTrades('tBTCUSD') ws.on('subscribed', () => { ws.unsubscribeTrades('tBTCUSD') @@ -259,14 +267,14 @@ describe('WSv2 listeners', () => { ws.unsubscribeTrades('tBTCUSD') }) - ws.on('unsubscribed', () => { - assert.strictEqual(subs, 1) - assert.strictEqual(unsubs, 1) - wss.close() - done() + return new Promise((resolve) => { + ws.on('unsubscribed', () => { + assert.strictEqual(subs, 1) + assert.strictEqual(unsubs, 1) + wss.close() + resolve() + }) }) - - ws.open() }) }) diff --git a/test/lib/transports/ws2-unit.js b/test/lib/transports/ws2-unit.js index a371839f..14b8251a 100644 --- a/test/lib/transports/ws2-unit.js +++ b/test/lib/transports/ws2-unit.js @@ -11,11 +11,12 @@ const API_KEY = 'dummy' const API_SECRET = 'dummy' const createTestWSv2Instance = (params = {}) => { - return new WSv2(Object.assign({ + return new WSv2({ apiKey: API_KEY, apiSecret: API_SECRET, - url: 'ws://localhost:9997' - }, params)) + url: 'ws://localhost:9997', + ...params + }) } describe('WSv2 utilities', () => { @@ -226,23 +227,33 @@ describe('WSv2 lifetime', () => { assert(flagsSent) }) - it('close: doesn\'t close if not open', (done) => { + it('close: doesn\'t close if not open', async () => { const ws = createTestWSv2Instance() - ws.close().then(() => assert(false)).catch(() => { - done() - }) + + try { + await ws.close() + assert(false) + } catch (e) {} }) - it('close: fails to close twice', (done) => { + it('close: fails to close twice', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance() - ws.open() - ws.on('open', ws.close.bind(ws)) - ws.on('close', () => { - ws.close().then(() => assert(false)).catch(() => { - wss.close() - done() + + await ws.open() + + return new Promise((resolve, reject) => { + ws.on('close', async () => { + try { + await ws.close() + reject(new Error('closed twice')) + } catch (e) { + wss.close() + resolve() + } }) + + return ws.close() }) }) @@ -252,39 +263,42 @@ describe('WSv2 lifetime', () => { ws._onWSClose = () => {} // disable fallback reset await ws.open() + assert(ws._ws !== null) assert(ws._isOpen) await ws.close() + assert(ws._ws == null) assert(!ws._isOpen) wss.close() }) - it('auth: fails to auth twice', (done) => { + it('auth: fails to auth twice', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance() - ws.open() - ws.on('open', ws.auth.bind(ws)) - ws.once('auth', () => { - ws.auth().then(() => assert(false)).catch(() => { - wss.close() - done() - }) - }) + + await ws.open() + await ws.auth() + + try { + await ws.auth() + assert(false) + } catch (e) { + wss.close() + } }) - it('auth: updates auth flag', (done) => { + it('auth: updates auth flag', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance() - ws.open() - ws.on('open', ws.auth.bind(ws)) - ws.once('auth', () => { - assert(ws.isAuthenticated()) - wss.close() - done() - }) + + await ws.open() + await ws.auth() + + assert(ws.isAuthenticated()) + wss.close() }) it('auth: forwards calc param', async () => { @@ -294,14 +308,19 @@ describe('WSv2 lifetime', () => { await ws.open() + const send = ws.send ws.send = (data) => { assert.strictEqual(data.calc, 42) - wss.close() sentCalc = true + + ws.send = send + ws.send(data) } - ws.auth(42) // note promise ignored + await ws.auth(42) + assert(sentCalc) + wss.close() }) it('auth: forwards dms param', async () => { @@ -311,85 +330,85 @@ describe('WSv2 lifetime', () => { await ws.open() + const send = ws.send ws.send = (data) => { assert.strictEqual(data.dms, 42) - wss.close() sentDMS = true + + ws.send = send + ws.send(data) } - ws.auth(0, 42) // note promise ignored + await ws.auth(0, 42) + assert(sentDMS) + wss.close() }) - it('reconnect: connects if not already connected', (done) => { + it('reconnect: connects if not already connected', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance() - ws.on('close', () => { - assert(false) - }) + let sawClose = false + let sawOpen = false - ws.on('open', () => { - wss.close() - done() - }) + ws.on('close', () => { sawClose = true }) + ws.on('open', () => { sawOpen = true }) + + await ws.reconnect() - ws.reconnect() + assert(!sawClose) + assert(sawOpen) + + wss.close() }) - it('reconnect: disconnects & connects back if currently connected', (done) => { + it('reconnect: disconnects & connects back if currently connected', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance() - let calls = 0 + await ws.open() - ws.on('close', () => { - if (++calls === 2) { - wss.close() - done() - } - }) + let sawClose = false + let sawOpen = false - ws.once('open', () => { - ws.reconnect() + ws.on('close', () => { sawClose = true }) + ws.on('open', () => { sawOpen = true }) - ws.once('open', () => { - if (++calls === 2) { - wss.close() - done() - } - }) - }) + await ws.reconnect() - ws.open() + assert(sawClose) + assert(sawOpen) + assert(ws.isOpen()) + + wss.close() }) - it('reconnect: automatically auths on open if previously authenticated', (done) => { + it('reconnect: automatically auths on open if previously authenticated', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance() let closed = false let opened = false + let authenticated = false - ws.on('error', done) + ws.on('error', (error) => { + throw error + }) - ws.once('open', ws.auth.bind(ws)) - ws.once('auth', () => { - setTimeout(() => { - ws.once('close', () => { closed = true }) - ws.once('open', () => { opened = true }) - ws.once('auth', () => { - assert(closed) - assert(opened) - wss.close() - done() - }) + await ws.open() + await ws.auth() - ws.reconnect() - }, 50) - }) + ws.once('close', () => { closed = true }) + ws.once('open', () => { opened = true }) + ws.once('auth', () => { authenticated = true }) - ws.open() + await ws.reconnect() + + assert(closed) + assert(opened) + assert(authenticated) + wss.close() }) }) @@ -408,61 +427,61 @@ describe('WSv2 constructor', () => { }) describe('WSv2 auto reconnect', () => { - it('reconnects on close if autoReconnect is enabled', (done) => { + it('reconnects on close if autoReconnect is enabled', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance({ autoReconnect: true }) - ws.on('open', ws.auth.bind(ws)) - ws.once('auth', () => { - ws.reconnectAfterClose = () => done() + await ws.open() + await ws.auth() + + return new Promise((resolve) => { + ws.reconnectAfterClose = new Promise(() => resolve()) wss.close() // trigger reconnect }) - - ws.open() }) - it('respects reconnectDelay', (done) => { + it('respects reconnectDelay', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance({ autoReconnect: true, reconnectDelay: 75 }) - ws.on('open', ws.auth.bind(ws)) - ws.once('auth', () => { + await ws.open() + await ws.auth() + + return new Promise((resolve) => { const now = Date.now() ws.reconnectAfterClose = () => { assert((Date.now() - now) >= 70) - done() + return new Promise(() => resolve()) } wss.close() // trigger reconnect }) - - ws.open() }) - it('does not auto-reconnect if explicity closed', (done) => { + it('does not auto-reconnect if explicity closed', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance({ autoReconnect: true }) - ws.on('open', ws.auth.bind(ws)) - ws.once('auth', () => { - ws.reconnect = () => assert(false) - ws.close() + await ws.open() + await ws.auth() + + ws.reconnect = () => assert(false) + ws.close() + return new Promise((resolve) => { setTimeout(() => { wss.close() - done() + resolve() }, 50) }) - - ws.open() }) }) @@ -475,7 +494,7 @@ describe('WSv2 seq audit', () => { assert(ws.isFlagEnabled(WSv2.flags.SEQ_ALL)) }) - it('emits error on invalid seq number', (done) => { + it('emits error on invalid seq number', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance({ seqAudit: true @@ -483,36 +502,33 @@ describe('WSv2 seq audit', () => { let errorsSeen = 0 - ws.once('open', ws.auth.bind(ws)) + await ws.open() + await ws.auth() + ws.on('error', (err) => { if (err.message.indexOf('seq #') !== -1) errorsSeen++ return null }) - ws.once('auth', () => { - ws._channelMap[42] = { channel: 'trades', chanId: 42 } - - ws._onWSMessage(JSON.stringify([0, 'tu', [], 0, 0])) - ws._onWSMessage(JSON.stringify([0, 'te', [], 1, 0])) - ws._onWSMessage(JSON.stringify([0, 'wu', [], 2, 1])) - ws._onWSMessage(JSON.stringify([0, 'tu', [], 3, 2])) // - ws._onWSMessage(JSON.stringify([0, 'tu', [], 4, 4])) // error - ws._onWSMessage(JSON.stringify([0, 'tu', [], 5, 5])) - ws._onWSMessage(JSON.stringify([0, 'tu', [], 6, 6])) - ws._onWSMessage(JSON.stringify([42, [], 7])) - ws._onWSMessage(JSON.stringify([42, [], 8])) - ws._onWSMessage(JSON.stringify([42, [], 9])) // - ws._onWSMessage(JSON.stringify([42, [], 13])) // error - ws._onWSMessage(JSON.stringify([42, [], 14])) - ws._onWSMessage(JSON.stringify([42, [], 15])) - - assert.strictEqual(errorsSeen, 6) - wss.close() - done() - }) + ws._channelMap[42] = { channel: 'trades', chanId: 42 } - ws.open() + ws._onWSMessage(JSON.stringify([0, 'tu', [], 0, 0])) + ws._onWSMessage(JSON.stringify([0, 'te', [], 1, 0])) + ws._onWSMessage(JSON.stringify([0, 'wu', [], 2, 1])) + ws._onWSMessage(JSON.stringify([0, 'tu', [], 3, 2])) // + ws._onWSMessage(JSON.stringify([0, 'tu', [], 4, 4])) // error + ws._onWSMessage(JSON.stringify([0, 'tu', [], 5, 5])) + ws._onWSMessage(JSON.stringify([0, 'tu', [], 6, 6])) + ws._onWSMessage(JSON.stringify([42, [], 7])) + ws._onWSMessage(JSON.stringify([42, [], 8])) + ws._onWSMessage(JSON.stringify([42, [], 9])) // + ws._onWSMessage(JSON.stringify([42, [], 13])) // error + ws._onWSMessage(JSON.stringify([42, [], 14])) + ws._onWSMessage(JSON.stringify([42, [], 15])) + + assert.strictEqual(errorsSeen, 6) + wss.close() }) }) @@ -1338,85 +1354,79 @@ describe('WSv2 event msg handling', () => { assert(Object.keys(ws._channelMap).length === 0) }) - it('_handleInfoEvent: passes message to relevant listeners (raw access)', (done) => { + it('_handleInfoEvent: passes message to relevant listeners (raw access)', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance() - ws.once('open', () => { - let n = 0 - ws._infoListeners[42] = [ - () => { n += 1 }, - () => { n += 2 } - ] + await ws.open() - ws._handleInfoEvent({ code: 42 }) + let n = 0 - assert.strictEqual(n, 3) - wss.close() - done() - }) + ws._infoListeners[42] = [ + () => { n += 1 }, + () => { n += 2 } + ] - ws.open() + ws._handleInfoEvent({ code: 42 }) + + assert.strictEqual(n, 3) + wss.close() }) - it('_handleInfoEvent: passes message to relevant listeners', (done) => { + it('_handleInfoEvent: passes message to relevant listeners', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance() - ws.once('open', () => { - let n = 0 - ws.onInfoMessage(42, () => { n += 1 }) - ws.onInfoMessage(42, () => { n += 2 }) - ws._handleInfoEvent({ code: 42 }) + await ws.open() - assert.strictEqual(n, 3) - wss.close() - done() - }) + let n = 0 - ws.open() + ws.onInfoMessage(42, () => { n += 1 }) + ws.onInfoMessage(42, () => { n += 2 }) + ws._handleInfoEvent({ code: 42 }) + + assert.strictEqual(n, 3) + wss.close() }) - it('_handleInfoEvent: passes message to relevant named listeners', (done) => { + it('_handleInfoEvent: passes message to relevant named listeners', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance() - ws.once('open', () => { - let n = 0 - ws.onServerRestart(() => { n += 1 }) - ws.onMaintenanceStart(() => { n += 10 }) - ws.onMaintenanceEnd(() => { n += 100 }) + await ws.open() - ws._handleInfoEvent({ code: WSv2.info.SERVER_RESTART }) - ws._handleInfoEvent({ code: WSv2.info.MAINTENANCE_START }) - ws._handleInfoEvent({ code: WSv2.info.MAINTENANCE_END }) + let n = 0 - assert.strictEqual(n, 111) - wss.close() - done() - }) + ws.onServerRestart(() => { n += 1 }) + ws.onMaintenanceStart(() => { n += 10 }) + ws.onMaintenanceEnd(() => { n += 100 }) - ws.open() + ws._handleInfoEvent({ code: WSv2.info.SERVER_RESTART }) + ws._handleInfoEvent({ code: WSv2.info.MAINTENANCE_START }) + ws._handleInfoEvent({ code: WSv2.info.MAINTENANCE_END }) + + assert.strictEqual(n, 111) + wss.close() }) - it('_handleInfoEvent: closes & emits error if not on api v2', (done) => { + it('_handleInfoEvent: closes & emits error if not on api v2', async () => { const wss = new MockWSv2Server() const ws = createTestWSv2Instance() - let seen = 0 - const d = () => { - wss.close() - done() - } + await ws.open() + + return new Promise((resolve) => { + let seen = 0 + const d = () => { + wss.close() + resolve() + } - ws.once('open', () => { ws.on('error', () => { if (++seen === 2) { d() } }) ws.on('close', () => { if (++seen === 2) { d() } }) ws._handleInfoEvent({ version: 3 }) }) - - ws.open() }) it('_flushOrderOps: returned promise rejects if not authorised', (done) => { @@ -1530,11 +1540,14 @@ describe('WSv2 packet watch-dog', () => { ws._isOpen = true ws.on('error', () => {}) // invalid json to prevent message routing - ws._triggerPacketWD = () => { + ws._triggerPacketWD = function () { assert((Date.now() - now) >= 95) done() + + return Promise.resolve() } + ws._triggerPacketWD = ws._triggerPacketWD.bind(ws) ws._onWSMessage('asdf') // send first packet, init wd }) @@ -1744,7 +1757,7 @@ describe('_handleTradeMessage', () => { }) }) -describe('resubscribePreviousChannels', () => { +describe('resubscribePreviousChannels', async () => { it('resubscribes to channels in prev channel map', () => { const ws = new WSv2() let subTicker = false