Skip to content

Commit

Permalink
feat(realtime): Handled resubscribing on reconnect (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
thzinc authored Nov 29, 2017
1 parent fdd0244 commit 878bf7e
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 65 deletions.
129 changes: 108 additions & 21 deletions src/RealTimeClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { DEFAULT_TRACK_API_HOST } from './Client';
const WEBSOCKET_READY_STATES = {
CONNECTING: 0,
OPEN: 1,
CLOSING: 2,
CLOSED: 3,
};

/**
Expand All @@ -19,6 +21,8 @@ class RealTimeClient {
* @param {Object} options Options for using the client to connect to the Track API
* @param {boolean} [options.reconnectOnClose] Whether to re-establish a connection when the
* current connection is closed. Defaults to true.
* @param {number} [options.reconnectTimeout] Number of milliseconds to wait before reattempting
* a connection
* @param {string} [options.realTimeUri] The URI for the WebSocket connection to the RealTime API.
* Defaults to production.
*/
Expand All @@ -43,6 +47,8 @@ class RealTimeClient {

// dummy object for safe calling semantics later
this.connection = {};
this.initializing = false;
this.timeoutHandles = [];

this.queuedMessages = [];

Expand All @@ -64,6 +70,7 @@ class RealTimeClient {
closeConnection() {
const { reconnectOnClose } = this.options;
this.options.reconnectOnClose = false;
this.timeoutHandles.forEach(clearTimeout);
if (this.connection && typeof this.connection.close === 'function') {
this.connection.close();
}
Expand All @@ -73,14 +80,29 @@ class RealTimeClient {
/**
* Creates a new WebSocket connection to the Track Real Time API and sends authentication
* messages.
* @param {Function} [onInitialize] Function to execute upon initializing a connection
* @returns {void}
*/
createConnection() {
const connection = new WebSocket(this.options.realTimeUri);
connection.onopen = this.onConnectionOpened;
connection.onclose = this.onConnectionClosed;
connection.onmessage = this.onMessageReceived;
this.connection = connection;
createConnection(onInitialize) {
if (this.initializing) return;
const readyState = this.connection.readyState || WEBSOCKET_READY_STATES.CLOSED;
switch (readyState) {
case WEBSOCKET_READY_STATES.OPEN:
case WEBSOCKET_READY_STATES.CONNECTING:
case WEBSOCKET_READY_STATES.CLOSING:
return;
default:
{
this.initializing = true;
const connection = new WebSocket(this.options.realTimeUri);
connection.onopen = this.onConnectionOpened;
connection.onclose = this.onConnectionClosed;
connection.onmessage = this.onMessageReceived;
this.connection = connection;
if (onInitialize) onInitialize();
}
break;
}
}

/**
Expand All @@ -89,11 +111,43 @@ class RealTimeClient {
* @returns {void}
*/
onConnectionClosed() {
if (this.options.reconnectOnClose) {
const {
reconnectOnClose = false,
reconnectTimeout = 0,
} = this.options;

if (reconnectOnClose) {
// the WebSocket API doesn't offer a way to re-open a closed connection.
// remove our connection and create a new one.
delete this.connection;
this.createConnection();
const reconnect = () => {
this.createConnection(() => Object.keys(this.subscriptions)
.map(subscriptionId => ({
...this.subscriptions[subscriptionId],
subscriptionId,
}))
.forEach(({
subscriptionId,
handler,
messageCreator,
mutableSubscriptionContainer,
subscriptionEndRejecter,
subscriptionEndResolver,
}) => {
this.sendStartSubscriptionMessage(
messageCreator,
handler,
mutableSubscriptionContainer,
() => { },
() => new Error('Could not resume subscription'),
subscriptionEndResolver,
subscriptionEndRejecter,
)
.then(() => {
delete this.subscriptions[subscriptionId];
});
}));
};
this.timeoutHandles.push(setTimeout(reconnect, reconnectTimeout));
}
}

Expand All @@ -103,6 +157,7 @@ class RealTimeClient {
* @returns {void}
*/
onConnectionOpened() {
this.initializing = false;
this.sendAuthentication()
.then(() => {
this.queuedMessages.forEach((queuedMessage) => {
Expand Down Expand Up @@ -135,6 +190,8 @@ class RealTimeClient {
{
const {
handler,
messageCreator,
mutableSubscriptionContainer,
subscriptionStartResolver,
subscriptionEndRejecter,
subscriptionEndResolver,
Expand All @@ -147,8 +204,12 @@ class RealTimeClient {
queuedMessages.forEach(qm => handler(qm));
}

mutableSubscriptionContainer.subscriptionId = message.subscription_id;

this.subscriptions[message.subscription_id] = {
handler,
messageCreator,
mutableSubscriptionContainer,
subscriptionEndRejecter,
subscriptionEndResolver,
};
Expand Down Expand Up @@ -235,9 +296,7 @@ class RealTimeClient {
resolver,
serialized,
});
if (this.connection.readyState !== WEBSOCKET_READY_STATES.CONNECTING) {
this.createConnection();
}
this.createConnection();
return messagePromise;
}

Expand All @@ -252,7 +311,6 @@ class RealTimeClient {
* value of the resolved promise is a function that will end the subscription.
*/
startSubscription(entity, customerCode, filters, handlerFunc) {
const message = messages.creators.createSubscriptionRequest(entity, customerCode, filters);
let subscriptionStartResolver;
let subscriptionStartRejecter;
const subscriptionStart = new Promise((resolve, reject) => {
Expand All @@ -265,28 +323,57 @@ class RealTimeClient {
subscriptionEndResolver = resolve;
subscriptionEndRejecter = reject;
});
this.openRequests[message.request_id] = {
handler: handlerFunc,
const mutableSubscriptionContainer = {
subscriptionId: undefined,
};
const messageCreator = () =>
messages.creators.createSubscriptionRequest(entity, customerCode, filters);
return this.sendStartSubscriptionMessage(
messageCreator,
handlerFunc,
mutableSubscriptionContainer,
subscriptionStartResolver,
subscriptionStartRejecter,
subscriptionEndResolver,
subscriptionEndRejecter,
};
return this.sendMessage(message)
)
.then(() => subscriptionStart)
.then(success => this.stopSubscription.bind(this, success.subscription_id, subscriptionEnd));
.then(() => this.stopSubscription.bind(this, mutableSubscriptionContainer, subscriptionEnd));
}

sendStartSubscriptionMessage(
messageCreator,
handler,
mutableSubscriptionContainer,
onStartSuccess,
onStartFailure,
onEndSuccess,
onEndFailure,
) {
const message = messageCreator();
this.openRequests[message.request_id] = {
messageCreator,
handler,
mutableSubscriptionContainer,
subscriptionStartResolver: onStartSuccess,
subscriptionStartRejecter: onStartFailure,
subscriptionEndResolver: onEndSuccess,
subscriptionEndRejecter: onEndFailure,
};
return this.sendMessage(message);
}

/**
* Ends a subscription to the Track Real Time API.
* @param {string} subscriptionId Identity of the subscription
* @param {object} subscriptionContainer Object containing the current subscription ID
* @param {String} subscriptionContainer.subscriptionId Identity of the current subscription
* @param {Promise} subscriptionEnd Promise that will be resolved upon end of the subscription
* @returns {void}
*/
stopSubscription(subscriptionId, subscriptionEnd) {
stopSubscription(subscriptionContainer, subscriptionEnd) {
const subscriptionEndRequest = {
type: messages.SUBSCRIPTION_END.REQUEST,
subscription_id: subscriptionId,
subscription_id: subscriptionContainer.subscriptionId,
};

return this.sendMessage(subscriptionEndRequest)
Expand Down
Loading

0 comments on commit 878bf7e

Please sign in to comment.