Skip to content

Commit

Permalink
Added communication library (#283)
Browse files Browse the repository at this point in the history
* Added communication library

* Clean code and added init comms

* Additional cleanup

* Fix wrong state checked

* Added QoS for subscription

* Added authentication

* Prepare for connection with private broker

* Update index.ts

* Remove private and secure init

* Remove nonsecure mqtt and added get user id

* Address issue raised in PR

* Update MqttController.ts

* Improved comments

* Check interval !== undefined
  • Loading branch information
8kdesign authored Mar 26, 2024
1 parent 3779341 commit ce5b07e
Show file tree
Hide file tree
Showing 10 changed files with 1,117 additions and 22 deletions.
3 changes: 3 additions & 0 deletions modules.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,8 @@
"tabs": [
"physics_2d"
]
},
"communication": {
"tabs": []
}
}
7 changes: 6 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,12 @@
"ace-builds": "^1.25.1",
"classnames": "^2.3.1",
"dayjs": "^1.10.4",
"events": "^3.3.0",
"gl-matrix": "^3.3.0",
"js-slang": "^1.0.48",
"lodash": "^4.17.21",
"mqtt": "^4.3.7",
"os": "^0.1.2",
"patch-package": "^6.5.1",
"phaser": "^3.54.0",
"plotly.js-dist": "^2.17.1",
Expand All @@ -120,7 +123,9 @@
"save-file": "^2.3.1",
"source-academy-utils": "^1.0.0",
"source-academy-wabt": "^1.0.4",
"tslib": "^2.3.1"
"tslib": "^2.3.1",
"uniqid": "^5.4.0",
"url": "^0.11.3"
},
"jest": {
"projects": [
Expand Down
200 changes: 200 additions & 0 deletions src/bundles/communication/Communications.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import context from 'js-slang/context';
import { MultiUserController } from './MultiUserController';
import { GlobalStateController } from './GlobalStateController';
import { RpcController } from './RpcController';

class CommunicationModuleState {
multiUser: MultiUserController;
globalState: GlobalStateController | null = null;
rpc: RpcController | null = null;

constructor(address: string, port: number, user: string, password: string) {
const multiUser = new MultiUserController();
multiUser.setupController(address, port, user, password);
this.multiUser = multiUser;
}
}

/**
* Initializes connection with MQTT broker.
* Currently only supports WebSocket.
*
* @param address Address of broker.
* @param port WebSocket port number for broker.
* @param user Username of account, use empty string if none.
* @param password Password of account, use empty string if none.
*/
export function initCommunications(
address: string,
port: number,
user: string,
password: string,
) {
if (getModuleState() instanceof CommunicationModuleState) {
return;
}
const newModuleState = new CommunicationModuleState(
address,
port,
user,
password,
);
context.moduleContexts.communication.state = newModuleState;
}

function getModuleState() {
return context.moduleContexts.communication.state;
}

// Loop

let interval: number | undefined;

/**
* Keeps the program running so that messages can come in.
*/
export function keepRunning() {
interval = window.setInterval(() => {}, 20000);
}

/**
* Removes interval that keeps the program running.
*/
export function stopRunning() {
if (interval !== undefined) {
window.clearInterval(interval);
interval = undefined;
}
}

// Global State

/**
* Initializes global state.
*
* @param topicHeader MQTT topic to use for global state.
* @param callback Callback to receive updates of global state.
*/
export function initGlobalState(
topicHeader: string,
callback: (state: any) => void,
) {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
if (moduleState.globalState instanceof GlobalStateController) {
return;
}
moduleState.globalState = new GlobalStateController(
topicHeader,
moduleState.multiUser,
callback,
);
return;
}
throw new Error('Error: Communication module not initialized.');
}

/**
* Obtains the current global state.
*
* @returns Current global state.
*/
export function getGlobalState() {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
return moduleState.globalState?.globalState;
}
throw new Error('Error: Communication module not initialized.');
}

/**
* Broadcasts the new states to all devices.
* Has ability to modify only part of the JSON state.
*
* @param path Path within the json state.
* @param updatedState Replacement value at specified path.
*/
export function updateGlobalState(path: string, updatedState: any) {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
moduleState.globalState?.updateGlobalState(path, updatedState);
return;
}
throw new Error('Error: Communication module not initialized.');
}

// Rpc

/**
* Initializes RPC.
*
* @param topicHeader MQTT topic to use for rpc.
* @param userId Identifier for this user, set undefined to generate a random ID.
*/
export function initRpc(topicHeader: string, userId?: string) {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
moduleState.rpc = new RpcController(
topicHeader,
moduleState.multiUser,
userId,
);
return;
}
throw new Error('Error: Communication module not initialized.');
}

/**
* Obtains the user's ID.
*
* @returns String for user ID.
*/
export function getUserId(): string {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
let userId = moduleState.rpc?.getUserId();
if (userId) {
return userId;
}
throw new Error('Error: UserID not found.');
}
throw new Error('Error: Communication module not initialized.');
}

/**
* Exposes the specified function to other users.
* Other users can use "callFunction" to call this function.
*
* @param name Identifier for the function.
* @param func Function to call when request received.
*/
export function expose(name: string, func: (...args: any[]) => any) {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
moduleState.rpc?.expose(name, func);
return;
}
throw new Error('Error: Communication module not initialized.');
}

/**
* Calls a function exposed by another user.
*
* @param receiver Identifier for the user whose function we want to call.
* @param name Identifier for function to call.
* @param args Array of arguments to pass into the function.
* @param callback Callback with return value.
*/
export function callFunction(
receiver: string,
name: string,
args: any[],
callback: (args: any[]) => void,
) {
const moduleState = getModuleState();
if (moduleState instanceof CommunicationModuleState) {
moduleState.rpc?.callFunction(receiver, name, args, callback);
return;
}
throw new Error('Error: Communication module not initialized.');
}
125 changes: 125 additions & 0 deletions src/bundles/communication/GlobalStateController.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { type MultiUserController } from './MultiUserController';

/**
* Controller for maintaining a global state across all devices.
* Depends on MQTT implementation in MultiUserController.
*
* @param topicHeader Identifier for all global state messages, must not include '/'.
* @param multiUser Instance of multi user controller.
* @param callback Callback called when the global state changes.
*/
export class GlobalStateController {
private topicHeader: string;
private multiUser: MultiUserController;
private callback: (state: any) => void;
globalState: any;

constructor(
topicHeader: string,
multiUser: MultiUserController,
callback: (state: any) => void,
) {
this.topicHeader = topicHeader;
this.multiUser = multiUser;
this.callback = callback;
this.setupGlobalState();
}

/**
* Sets up callback for global state messages.
* Parses received message and stores it as global state.
*/
private setupGlobalState() {
if (this.topicHeader.length <= 0) return;
this.multiUser.addMessageCallback(this.topicHeader, (topic, message) => {
const shortenedTopic = topic.substring(
this.topicHeader.length,
topic.length,
);
this.parseGlobalStateMessage(shortenedTopic, message);
});
}

/**
* Parses the message received via MQTT and updates the global state.
*
* @param shortenedTopic Path of JSON branch.
* @param message New value to set.
*/
public parseGlobalStateMessage(shortenedTopic: string, message: string) {
let preSplitTopic = shortenedTopic.trim();
if (preSplitTopic.length === 0) {
try {
this.setGlobalState(JSON.parse(message));
} catch {
this.setGlobalState(undefined);
}
return;
}
if (!preSplitTopic.startsWith('/')) {
preSplitTopic = `/${preSplitTopic}`;
}
const splitTopic = preSplitTopic.split('/');
try {
let newGlobalState = { ...this.globalState };
if (
this.globalState instanceof Array ||

Check warning on line 66 in src/bundles/communication/GlobalStateController.ts

View workflow job for this annotation

GitHub Actions / deploy

'||' should be placed at the beginning of the line
typeof this.globalState === 'string'
) {
newGlobalState = {};
}
let currentJson = newGlobalState;
for (let i = 1; i < splitTopic.length - 1; i++) {
const subTopic = splitTopic[i];
if (
!(currentJson[subTopic] instanceof Object) ||

Check warning on line 75 in src/bundles/communication/GlobalStateController.ts

View workflow job for this annotation

GitHub Actions / deploy

'||' should be placed at the beginning of the line
currentJson[subTopic] instanceof Array ||

Check warning on line 76 in src/bundles/communication/GlobalStateController.ts

View workflow job for this annotation

GitHub Actions / deploy

'||' should be placed at the beginning of the line
typeof currentJson[subTopic] === 'string'
) {
currentJson[subTopic] = {};
}
currentJson = currentJson[subTopic];
}
if (message === undefined || message.length === 0) {
delete currentJson[splitTopic[splitTopic.length - 1]];
} else {
const jsonMessage = JSON.parse(message);
currentJson[splitTopic[splitTopic.length - 1]] = jsonMessage;
}
this.setGlobalState(newGlobalState);
} catch (error) {
console.log('Failed to parse message', error);
}
}

/**
* Sets the new global state and calls the callback to notify changes.
*
* @param newState New state received.
*/
private setGlobalState(newState: any) {
this.globalState = newState;
this.callback(newState);
}

/**
* Broadcasts the new states to all devices.
* Has ability to modify only part of the JSON state.
*
* @param path Path within the json state.
* @param updatedState Replacement value at specified path.
*/
public updateGlobalState(path: string, updatedState: any) {
if (this.topicHeader.length === 0) return;
let topic = this.topicHeader;
if (path.length !== 0 && !path.startsWith('/')) {
topic += '/';
}
topic += path;
this.multiUser.controller?.publish(
topic,
JSON.stringify(updatedState),
false,
);
}
}
Loading

0 comments on commit ce5b07e

Please sign in to comment.