From 878bf7ee1c78942bc31728764852118d662250c4 Mon Sep 17 00:00:00 2001 From: Daniel James Date: Wed, 29 Nov 2017 13:50:34 -0800 Subject: [PATCH] feat(realtime): Handled resubscribing on reconnect (#33) --- src/RealTimeClient.js | 129 +++++++++++++++++++++++++----- src/RealTimeClient.test.js | 160 +++++++++++++++++++++++++++---------- src/mocks.js | 6 +- 3 files changed, 230 insertions(+), 65 deletions(-) diff --git a/src/RealTimeClient.js b/src/RealTimeClient.js index f67b163..d269333 100644 --- a/src/RealTimeClient.js +++ b/src/RealTimeClient.js @@ -5,6 +5,8 @@ import { DEFAULT_TRACK_API_HOST } from './Client'; const WEBSOCKET_READY_STATES = { CONNECTING: 0, OPEN: 1, + CLOSING: 2, + CLOSED: 3, }; /** @@ -19,6 +21,8 @@ class RealTimeClient { * @param {Object} options Options for using the client to connect to the Track API * @param {boolean} [options.reconnectOnClose] Whether to re-establish a connection when the * current connection is closed. Defaults to true. + * @param {number} [options.reconnectTimeout] Number of milliseconds to wait before reattempting + * a connection * @param {string} [options.realTimeUri] The URI for the WebSocket connection to the RealTime API. * Defaults to production. */ @@ -43,6 +47,8 @@ class RealTimeClient { // dummy object for safe calling semantics later this.connection = {}; + this.initializing = false; + this.timeoutHandles = []; this.queuedMessages = []; @@ -64,6 +70,7 @@ class RealTimeClient { closeConnection() { const { reconnectOnClose } = this.options; this.options.reconnectOnClose = false; + this.timeoutHandles.forEach(clearTimeout); if (this.connection && typeof this.connection.close === 'function') { this.connection.close(); } @@ -73,14 +80,29 @@ class RealTimeClient { /** * Creates a new WebSocket connection to the Track Real Time API and sends authentication * messages. + * @param {Function} [onInitialize] Function to execute upon initializing a connection * @returns {void} */ - createConnection() { - const connection = new WebSocket(this.options.realTimeUri); - connection.onopen = this.onConnectionOpened; - connection.onclose = this.onConnectionClosed; - connection.onmessage = this.onMessageReceived; - this.connection = connection; + createConnection(onInitialize) { + if (this.initializing) return; + const readyState = this.connection.readyState || WEBSOCKET_READY_STATES.CLOSED; + switch (readyState) { + case WEBSOCKET_READY_STATES.OPEN: + case WEBSOCKET_READY_STATES.CONNECTING: + case WEBSOCKET_READY_STATES.CLOSING: + return; + default: + { + this.initializing = true; + const connection = new WebSocket(this.options.realTimeUri); + connection.onopen = this.onConnectionOpened; + connection.onclose = this.onConnectionClosed; + connection.onmessage = this.onMessageReceived; + this.connection = connection; + if (onInitialize) onInitialize(); + } + break; + } } /** @@ -89,11 +111,43 @@ class RealTimeClient { * @returns {void} */ onConnectionClosed() { - if (this.options.reconnectOnClose) { + const { + reconnectOnClose = false, + reconnectTimeout = 0, + } = this.options; + + if (reconnectOnClose) { // the WebSocket API doesn't offer a way to re-open a closed connection. // remove our connection and create a new one. - delete this.connection; - this.createConnection(); + const reconnect = () => { + this.createConnection(() => Object.keys(this.subscriptions) + .map(subscriptionId => ({ + ...this.subscriptions[subscriptionId], + subscriptionId, + })) + .forEach(({ + subscriptionId, + handler, + messageCreator, + mutableSubscriptionContainer, + subscriptionEndRejecter, + subscriptionEndResolver, + }) => { + this.sendStartSubscriptionMessage( + messageCreator, + handler, + mutableSubscriptionContainer, + () => { }, + () => new Error('Could not resume subscription'), + subscriptionEndResolver, + subscriptionEndRejecter, + ) + .then(() => { + delete this.subscriptions[subscriptionId]; + }); + })); + }; + this.timeoutHandles.push(setTimeout(reconnect, reconnectTimeout)); } } @@ -103,6 +157,7 @@ class RealTimeClient { * @returns {void} */ onConnectionOpened() { + this.initializing = false; this.sendAuthentication() .then(() => { this.queuedMessages.forEach((queuedMessage) => { @@ -135,6 +190,8 @@ class RealTimeClient { { const { handler, + messageCreator, + mutableSubscriptionContainer, subscriptionStartResolver, subscriptionEndRejecter, subscriptionEndResolver, @@ -147,8 +204,12 @@ class RealTimeClient { queuedMessages.forEach(qm => handler(qm)); } + mutableSubscriptionContainer.subscriptionId = message.subscription_id; + this.subscriptions[message.subscription_id] = { handler, + messageCreator, + mutableSubscriptionContainer, subscriptionEndRejecter, subscriptionEndResolver, }; @@ -235,9 +296,7 @@ class RealTimeClient { resolver, serialized, }); - if (this.connection.readyState !== WEBSOCKET_READY_STATES.CONNECTING) { - this.createConnection(); - } + this.createConnection(); return messagePromise; } @@ -252,7 +311,6 @@ class RealTimeClient { * value of the resolved promise is a function that will end the subscription. */ startSubscription(entity, customerCode, filters, handlerFunc) { - const message = messages.creators.createSubscriptionRequest(entity, customerCode, filters); let subscriptionStartResolver; let subscriptionStartRejecter; const subscriptionStart = new Promise((resolve, reject) => { @@ -265,28 +323,57 @@ class RealTimeClient { subscriptionEndResolver = resolve; subscriptionEndRejecter = reject; }); - this.openRequests[message.request_id] = { - handler: handlerFunc, + const mutableSubscriptionContainer = { + subscriptionId: undefined, + }; + const messageCreator = () => + messages.creators.createSubscriptionRequest(entity, customerCode, filters); + return this.sendStartSubscriptionMessage( + messageCreator, + handlerFunc, + mutableSubscriptionContainer, subscriptionStartResolver, subscriptionStartRejecter, subscriptionEndResolver, subscriptionEndRejecter, - }; - return this.sendMessage(message) + ) .then(() => subscriptionStart) - .then(success => this.stopSubscription.bind(this, success.subscription_id, subscriptionEnd)); + .then(() => this.stopSubscription.bind(this, mutableSubscriptionContainer, subscriptionEnd)); + } + + sendStartSubscriptionMessage( + messageCreator, + handler, + mutableSubscriptionContainer, + onStartSuccess, + onStartFailure, + onEndSuccess, + onEndFailure, + ) { + const message = messageCreator(); + this.openRequests[message.request_id] = { + messageCreator, + handler, + mutableSubscriptionContainer, + subscriptionStartResolver: onStartSuccess, + subscriptionStartRejecter: onStartFailure, + subscriptionEndResolver: onEndSuccess, + subscriptionEndRejecter: onEndFailure, + }; + return this.sendMessage(message); } /** * Ends a subscription to the Track Real Time API. - * @param {string} subscriptionId Identity of the subscription + * @param {object} subscriptionContainer Object containing the current subscription ID + * @param {String} subscriptionContainer.subscriptionId Identity of the current subscription * @param {Promise} subscriptionEnd Promise that will be resolved upon end of the subscription * @returns {void} */ - stopSubscription(subscriptionId, subscriptionEnd) { + stopSubscription(subscriptionContainer, subscriptionEnd) { const subscriptionEndRequest = { type: messages.SUBSCRIPTION_END.REQUEST, - subscription_id: subscriptionId, + subscription_id: subscriptionContainer.subscriptionId, }; return this.sendMessage(subscriptionEndRequest) diff --git a/src/RealTimeClient.test.js b/src/RealTimeClient.test.js index 6928608..79a8ccc 100644 --- a/src/RealTimeClient.test.js +++ b/src/RealTimeClient.test.js @@ -9,19 +9,22 @@ chai.use(chaiAsPromised); describe('When creating a real time connection', () => { + let server; + beforeEach(() => { server = mock.getServer(); }); + afterEach(() => server.closeConnection()); + it('should connect only after REST client authenticates', () => { let resolveAuthentication; const mockClient = { authenticated: new Promise((resolve) => { resolveAuthentication = resolve; }), }; let wasConnectionOpened = false; - const server = mock.getServer(); server.on('connection', () => { wasConnectionOpened = true; }); - const rtClient = new RealTimeClient(mockClient, mock.options); + const realTimeClient = new RealTimeClient(mockClient, mock.options); - const messagePromise = rtClient.sendMessage({ foo: 'bar' }); + const messagePromise = realTimeClient.sendMessage({ foo: 'bar' }); wasConnectionOpened.should.equal(false); resolveAuthentication(); @@ -29,32 +32,27 @@ describe('When creating a real time connection', () => { mockClient.authenticated, messagePromise, ]) - .then(() => rtClient.closeConnection()) - .then(() => server.close()) - .then(() => wasConnectionOpened) - .should.eventually.become(true); + .then(() => server.closeConnection(realTimeClient)) + .then(() => wasConnectionOpened).should.eventually.become(true); }); it('should create at most one connection', () => { - const server = mock.getServer(); let numConnections = 0; server.on('connection', () => { numConnections += 1; }); - const rtClient = new RealTimeClient(mock.authenticatedClient, mock.options); + const realTimeClient = new RealTimeClient(mock.authenticatedClient, mock.options); - const message1 = rtClient.sendMessage({ foo: 'bar' }); - const message2 = rtClient.sendMessage({ bar: 'baz' }); - const message3 = rtClient.sendMessage({ baz: 'foo' }); + const message1 = realTimeClient.sendMessage({ foo: 'bar' }); + const message2 = realTimeClient.sendMessage({ bar: 'baz' }); + const message3 = realTimeClient.sendMessage({ baz: 'foo' }); return Promise.all([ message1, message2, message3, ]) - .then(() => rtClient.closeConnection()) - .then(() => server.close()) - .then(() => numConnections) - .should.eventually.become(1); + .then(() => server.closeConnection(realTimeClient)) + .then(() => numConnections).should.eventually.become(1); }); it('should queue messages to send while connecting', () => { @@ -64,7 +62,6 @@ describe('When creating a real time connection', () => { authenticated: new Promise((resolve) => { resolveAuthentication = resolve; }), }; let numMessagesReceived = 0; - const server = mock.getServer(); // no guarantee on when our webserver mock will fire its message, just that it will. // so let's create a promise of it and inspect later. @@ -77,11 +74,11 @@ describe('When creating a real time connection', () => { resolveGotAllMessages(); } }); - const rtClient = new RealTimeClient(mockClient, mock.options); + const realTimeClient = new RealTimeClient(mockClient, mock.options); - const message1 = rtClient.sendMessage({ foo: 'bar' }); - const message2 = rtClient.sendMessage({ bar: 'baz' }); - const message3 = rtClient.sendMessage({ baz: 'foo' }); + const message1 = realTimeClient.sendMessage({ foo: 'bar' }); + const message2 = realTimeClient.sendMessage({ bar: 'baz' }); + const message3 = realTimeClient.sendMessage({ baz: 'foo' }); resolveAuthentication(); @@ -91,39 +88,120 @@ describe('When creating a real time connection', () => { message3, gotAllMessages, ]) - .then(() => rtClient.closeConnection()) - .then(() => server.close()) - .should.be.fulfilled; + .then(() => server.closeConnection(realTimeClient)); }); }); describe('When the real time connection is disconnected', () => { + let server; + let realTimeClient; + beforeEach(() => { + server = mock.getServer(); + realTimeClient = new RealTimeClient(mock.authenticatedClient, { + ...mock.options, + reconnectTimeout: 0, + }); + }); + afterEach(() => { + server.closeConnection(realTimeClient); + }); + + const reconnect = () => { + server.closeConnection(); + server = mock.getServer(); + return Promise.resolve(); + }; + it('should reconnect and re-authenticate', () => { - const server = mock.getServer(); let numAuths = 0; - const rtClient = new RealTimeClient(mock.authenticatedClient, mock.options); - let resolveGotAllAuths; const gotAllAuths = new Promise((resolve) => { resolveGotAllAuths = resolve; }); - server.onTrackMessage(messages.AUTHENTICATION.REQUEST, () => { - numAuths += 1; - if (numAuths === 2) { - resolveGotAllAuths(); - } + let numMessages = 0; + let resolveGotAllMessages; + const gotAllMessages = new Promise((resolve) => { + resolveGotAllMessages = resolve; }); + const configureListener = () => { + server.onTrackMessage(messages.AUTHENTICATION.REQUEST, () => { + numAuths += 1; + if (numAuths === 2) { + resolveGotAllAuths(); + } + }); + server.onTrackMessage('TEST', () => { + numMessages += 1; + if (numMessages === 2) { + resolveGotAllMessages(); + } + }); + }; - const message1 = rtClient.sendMessage({ foo: 'bar' }) - // force a connection close. this will trigger another authentication - // request when it reconnects. - .then(() => rtClient.connection.close()); - const message2 = rtClient.sendMessage({ bar: 'baz' }); + configureListener(); + const message1 = realTimeClient.sendMessage({ type: 'TEST', id: 1 }) + .then(reconnect) + .then(configureListener); + const message2 = message1 + .then(() => realTimeClient.sendMessage({ type: 'TEST', id: 2 })); return Promise.all([ message1, message2, gotAllAuths, - ]) - .then(() => rtClient.closeConnection()) - .then(() => server.close()) - .should.be.fulfilled; + gotAllMessages, + ]); + }); + + it('should resubscribe to all subscriptions', () => { + let startRequests = 0; + let gotAllStartRequestsResolver; + const gotAllStartRequests = new Promise((resolve) => { + gotAllStartRequestsResolver = resolve; + }); + let lastSubscriptionResolver; + const lastSubscription = new Promise((resolve) => { + lastSubscriptionResolver = resolve; + }); + const configureListener = () => { + server.onTrackMessage(messages.SUBSCRIPTION_START.REQUEST, () => { + startRequests += 1; + if (startRequests === 2) { + gotAllStartRequestsResolver(); + } + }); + server.onTrackMessage(messages.SUBSCRIPTION_END.REQUEST, (message) => { + lastSubscriptionResolver(message.subscription_id); + }); + }; + configureListener(); + const observedSubscriptionIds = []; + const subscriptionStart = realTimeClient.startSubscription( + 'VEHICLES', + 'SYNC', + { + one: '1', + two: '2', + three: '3', + }, + (update) => { observedSubscriptionIds.push(update.subscription_id); }, + ); + + let subscriptionEnder; + const disruptConnection = subscriptionStart + .then((end) => { + subscriptionEnder = end; + }) + .then(reconnect) + .then(configureListener); + const endSubscription = gotAllStartRequests + .then(() => subscriptionEnder()); + const multipleSubscriptions = endSubscription + .then(() => observedSubscriptionIds); + return Promise.all([ + subscriptionStart, + disruptConnection, + endSubscription, + gotAllStartRequests, + multipleSubscriptions.should.eventually.become([1, 2]), + lastSubscription.should.eventually.become(2), + ]); }); }); diff --git a/src/mocks.js b/src/mocks.js index 09b0472..be22bbb 100644 --- a/src/mocks.js +++ b/src/mocks.js @@ -728,6 +728,7 @@ export const realTime = { authenticatedClient: { authenticated: Promise.resolve(), }, + subscriptionIdCounter: 0, /** * Returns a websocket server to which you can attach events. @@ -815,10 +816,9 @@ export const realTime = { }); server.emit('message', response); - let subscriptionIdCounter = 0; server.onTrackMessage(messages.SUBSCRIPTION_START.REQUEST, (request) => { - subscriptionIdCounter += 1; - const subscriptionId = subscriptionIdCounter; + realTime.subscriptionIdCounter += 1; + const subscriptionId = realTime.subscriptionIdCounter; server.emit('message', JSON.stringify({ type: messages.SUBSCRIPTION_START.SUCCESS,