Skip to content

Commit

Permalink
Support telemetry batching and move WebSocket handling to worker (#7391)
Browse files Browse the repository at this point in the history
* Support subscription batching from API, Tables, and Plots

* Added batching worker

* Added configurable batch size and throttling rate

* Support batch size based throttling

* Default to latest strategy

* Don't hide original error

* Added copyright statement

* Renamed BatchingWebSocketProvider to BatchingWebSocket

* Adding docs

* renamed class. changed throttling strategy to be driven by the main thread

* Renamed classes

* Added more documentation

* Fixed broken tests

* Addressed review comments

* Clean up and reconnect on websocket close

* Better management of subscription strategies

* Add tests to catch edge cases where two subscribers request different strategies

* Ensure callbacks are invoked with telemetry in the requested format

* Remove console out. Oops

* Fix linting errors
  • Loading branch information
akhenry authored Jan 29, 2024
1 parent 0eea2e0 commit 5c21c34
Show file tree
Hide file tree
Showing 9 changed files with 847 additions and 62 deletions.
3 changes: 2 additions & 1 deletion .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,8 @@
"WCAG",
"stackedplot",
"Andale",
"checksnapshots"
"checksnapshots",
"specced"
],
"dictionaries": ["npm", "softwareTerms", "node", "html", "css", "bash", "en_US"],
"ignorePaths": [
Expand Down
194 changes: 194 additions & 0 deletions src/api/telemetry/BatchingWebSocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*****************************************************************************
* Open MCT Web, Copyright (c) 2014-2024, United States Government
* as represented by the Administrator of the National Aeronautics and Space
* Administration. All rights reserved.
*
* Open MCT Web is licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* Open MCT Web includes source code licensed under additional open source
* licenses. See the Open Source Licenses file (LICENSES.md) included with
* this source code distribution or the Licensing information page available
* at runtime from the About dialog for additional information.
*****************************************************************************/
import installWorker from './WebSocketWorker.js';
const DEFAULT_RATE_MS = 1000;
/**
* Describes the strategy to be used when batching WebSocket messages
*
* @typedef BatchingStrategy
* @property {Function} shouldBatchMessage a function that accepts a single
* argument - the raw message received from the websocket. Every message
* received will be evaluated against this function so it should be performant.
* Note also that this function is executed in a worker, so it must be
* completely self-contained with no external dependencies. The function
* should return `true` if the message should be batched, and `false` if not.
* @property {Function} getBatchIdFromMessage a function that accepts a
* single argument - the raw message received from the websocket. Only messages
* where `shouldBatchMessage` has evaluated to true will be passed into this
* function. The function should return a unique value on which to batch the
* messages. For example a telemetry, channel, or parameter identifier.
*/
/**
* Provides a reliable and convenient WebSocket abstraction layer that handles
* a lot of boilerplate common to managing WebSocket connections such as:
* - Establishing a WebSocket connection to a server
* - Reconnecting on error, with a fallback strategy
* - Queuing messages so that clients can send messages without concern for the current
* connection state of the WebSocket.
*
* The WebSocket that it manages is based in a dedicated worker so that network
* concerns are not handled on the main event loop. This allows for performant receipt
* and batching of messages without blocking either the UI or server.
*
* @memberof module:openmct.telemetry
*/
class BatchingWebSocket extends EventTarget {
#worker;
#openmct;
#showingRateLimitNotification;
#rate;

constructor(openmct) {
super();
// Install worker, register listeners etc.
const workerFunction = `(${installWorker.toString()})()`;
const workerBlob = new Blob([workerFunction]);
const workerUrl = URL.createObjectURL(workerBlob, { type: 'application/javascript' });
this.#worker = new Worker(workerUrl);
this.#openmct = openmct;
this.#showingRateLimitNotification = false;
this.#rate = DEFAULT_RATE_MS;

const routeMessageToHandler = this.#routeMessageToHandler.bind(this);
this.#worker.addEventListener('message', routeMessageToHandler);
openmct.on(
'destroy',
() => {
this.disconnect();
URL.revokeObjectURL(workerUrl);
},
{ once: true }
);
}

/**
* Will establish a WebSocket connection to the provided url
* @param {string} url The URL to connect to
*/
connect(url) {
this.#worker.postMessage({
type: 'connect',
url
});

this.#readyForNextBatch();
}

#readyForNextBatch() {
this.#worker.postMessage({
type: 'readyForNextBatch'
});
}

/**
* Send a message to the WebSocket.
* @param {any} message The message to send. Can be any type supported by WebSockets.
* See https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/send#data
*/
sendMessage(message) {
this.#worker.postMessage({
type: 'message',
message
});
}

/**
* Set the strategy used to both decide which raw messages to batch, and how to group
* them.
* @param {BatchingStrategy} strategy The batching strategy to use when evaluating
* raw messages from the WebSocket.
*/
setBatchingStrategy(strategy) {
const serializedStrategy = {
shouldBatchMessage: strategy.shouldBatchMessage.toString(),
getBatchIdFromMessage: strategy.getBatchIdFromMessage.toString()
};

this.#worker.postMessage({
type: 'setBatchingStrategy',
serializedStrategy
});
}

/**
* When using batching, sets the rate at which batches of messages are released.
* @param {Number} rate the amount of time to wait, in ms, between batches.
*/
setRate(rate) {
this.#rate = rate;
}

/**
* @param {Number} maxBatchSize the maximum length of a batch of messages. For example,
* the maximum number of telemetry values to batch before dropping them
* Note that this is a fail-safe that is only invoked if performance drops to the
* point where Open MCT cannot keep up with the amount of telemetry it is receiving.
* In this event it will sacrifice the oldest telemetry in the batch in favor of the
* most recent telemetry. The user will be informed that telemetry has been dropped.
*
* This should be set appropriately for the expected data rate. eg. If telemetry
* is received at 10Hz for each telemetry point, then a minimal combination of batch
* size and rate is 10 and 1000 respectively. Ideally you would add some margin, so
* 15 would probably be a better batch size.
*/
setMaxBatchSize(maxBatchSize) {
this.#worker.postMessage({
type: 'setMaxBatchSize',
maxBatchSize
});
}

/**
* Disconnect the associated WebSocket. Generally speaking there is no need to call
* this manually.
*/
disconnect() {
this.#worker.postMessage({
type: 'disconnect'
});
}

#routeMessageToHandler(message) {
if (message.data.type === 'batch') {
if (message.data.batch.dropped === true && !this.#showingRateLimitNotification) {
const notification = this.#openmct.notifications.alert(
'Telemetry dropped due to client rate limiting.',
{ hint: 'Refresh individual telemetry views to retrieve dropped telemetry if needed.' }
);
this.#showingRateLimitNotification = true;
notification.once('minimized', () => {
this.#showingRateLimitNotification = false;
});
}
this.dispatchEvent(new CustomEvent('batch', { detail: message.data.batch }));
setTimeout(() => {
this.#readyForNextBatch();
}, this.#rate);
} else if (message.data.type === 'message') {
this.dispatchEvent(new CustomEvent('message', { detail: message.data.message }));
} else {
throw new Error(`Unknown message type: ${message.data.type}`);
}
}
}

export default BatchingWebSocket;
123 changes: 105 additions & 18 deletions src/api/telemetry/TelemetryAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import objectUtils from 'objectUtils';

import CustomStringFormatter from '../../plugins/displayLayout/CustomStringFormatter.js';
import BatchingWebSocket from './BatchingWebSocket.js';
import DefaultMetadataProvider from './DefaultMetadataProvider.js';
import TelemetryCollection from './TelemetryCollection.js';
import TelemetryMetadataManager from './TelemetryMetadataManager.js';
Expand Down Expand Up @@ -54,13 +55,40 @@ import TelemetryValueFormatter from './TelemetryValueFormatter.js';
* @memberof module:openmct.TelemetryAPI~
*/

/**
* Describes and bounds requests for telemetry data.
*
* @typedef TelemetrySubscriptionOptions
* @property {String} [strategy] symbolic identifier directing providers on how
* to handle telemetry subscriptions. The default behavior is 'latest' which will
* always return a single telemetry value with each callback, and in the event
* of throttling will always prioritize the latest data, meaning intermediate
* data will be skipped. Alternatively, the `batch` strategy can be used, which
* will return all telemetry values since the last callback. This strategy is
* useful for cases where intermediate data is important, such as when
* rendering a telemetry plot or table. If `batch` is specified, the subscription
* callback will be invoked with an Array.
*
* @memberof module:openmct.TelemetryAPI~
*/

const SUBSCRIBE_STRATEGY = {
LATEST: 'latest',
BATCH: 'batch'
};

/**
* Utilities for telemetry
* @interface TelemetryAPI
* @memberof module:openmct
*/
export default class TelemetryAPI {
#isGreedyLAD;
#subscribeCache;

get SUBSCRIBE_STRATEGY() {
return SUBSCRIBE_STRATEGY;
}

constructor(openmct) {
this.openmct = openmct;
Expand All @@ -78,6 +106,8 @@ export default class TelemetryAPI {
this.valueFormatterCache = new WeakMap();
this.requestInterceptorRegistry = new TelemetryRequestInterceptorRegistry();
this.#isGreedyLAD = true;
this.BatchingWebSocket = BatchingWebSocket;
this.#subscribeCache = {};
}

abortAllRequests() {
Expand Down Expand Up @@ -378,54 +408,111 @@ export default class TelemetryAPI {
* @memberof module:openmct.TelemetryAPI~TelemetryProvider#
* @param {module:openmct.DomainObject} domainObject the object
* which has associated telemetry
* @param {TelemetryRequestOptions} options configuration items for subscription
* @param {TelemetrySubscriptionOptions} options configuration items for subscription
* @param {Function} callback the callback to invoke with new data, as
* it becomes available
* @returns {Function} a function which may be called to terminate
* the subscription
*/
subscribe(domainObject, callback, options) {
subscribe(domainObject, callback, options = { strategy: SUBSCRIBE_STRATEGY.LATEST }) {
const requestedStrategy = options.strategy || SUBSCRIBE_STRATEGY.LATEST;

if (domainObject.type === 'unknown') {
return () => {};
}

const provider = this.findSubscriptionProvider(domainObject);
const provider = this.findSubscriptionProvider(domainObject, options);
const supportsBatching =
Boolean(provider?.supportsBatching) && provider?.supportsBatching(domainObject, options);

if (!this.subscribeCache) {
this.subscribeCache = {};
if (!this.#subscribeCache) {
this.#subscribeCache = {};
}

const keyString = objectUtils.makeKeyString(domainObject.identifier);
let subscriber = this.subscribeCache[keyString];
const supportedStrategy = supportsBatching ? requestedStrategy : SUBSCRIBE_STRATEGY.LATEST;
// Override the requested strategy with the strategy supported by the provider
const optionsWithSupportedStrategy = {
...options,
strategy: supportedStrategy
};
// If batching is supported, we need to cache a subscription for each strategy -
// latest and batched.
const cacheKey = `${keyString}:${supportedStrategy}`;
let subscriber = this.#subscribeCache[cacheKey];

if (!subscriber) {
subscriber = this.subscribeCache[keyString] = {
callbacks: [callback]
subscriber = this.#subscribeCache[cacheKey] = {
latestCallbacks: [],
batchCallbacks: []
};
if (provider) {
subscriber.unsubscribe = provider.subscribe(
domainObject,
function (value) {
subscriber.callbacks.forEach(function (cb) {
cb(value);
});
},
options
invokeCallbackWithRequestedStrategy,
optionsWithSupportedStrategy
);
} else {
subscriber.unsubscribe = function () {};
}
}

if (requestedStrategy === SUBSCRIBE_STRATEGY.BATCH) {
subscriber.batchCallbacks.push(callback);
} else {
subscriber.callbacks.push(callback);
subscriber.latestCallbacks.push(callback);
}

// Guarantees that view receive telemetry in the expected form
function invokeCallbackWithRequestedStrategy(data) {
invokeCallbacksWithArray(data, subscriber.batchCallbacks);
invokeCallbacksWithSingleValue(data, subscriber.latestCallbacks);
}

function invokeCallbacksWithArray(data, batchCallbacks) {
//
if (data === undefined || data === null || data.length === 0) {
throw new Error(
'Attempt to invoke telemetry subscription callback with no telemetry datum'
);
}

if (!Array.isArray(data)) {
data = [data];
}

batchCallbacks.forEach((cb) => {
cb(data);
});
}

function invokeCallbacksWithSingleValue(data, latestCallbacks) {
if (Array.isArray(data)) {
data = data[data.length - 1];
}

if (data === undefined || data === null) {
throw new Error(
'Attempt to invoke telemetry subscription callback with no telemetry datum'
);
}

latestCallbacks.forEach((cb) => {
cb(data);
});
}

return function unsubscribe() {
subscriber.callbacks = subscriber.callbacks.filter(function (cb) {
subscriber.latestCallbacks = subscriber.latestCallbacks.filter(function (cb) {
return cb !== callback;
});
if (subscriber.callbacks.length === 0) {
subscriber.batchCallbacks = subscriber.batchCallbacks.filter(function (cb) {
return cb !== callback;
});

if (subscriber.latestCallbacks.length === 0 && subscriber.batchCallbacks.length === 0) {
subscriber.unsubscribe();
delete this.subscribeCache[keyString];
delete this.#subscribeCache[cacheKey];
}
}.bind(this);
}
Expand Down
Loading

0 comments on commit 5c21c34

Please sign in to comment.