-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Change websocket client to handle multiple subscriptions
- Loading branch information
1 parent
4a6b875
commit a58a20a
Showing
1 changed file
with
81 additions
and
55 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,100 +1,126 @@ | ||
import ReconnectingWebSocket from 'reconnecting-websocket' | ||
import { validate } from 'uuid' | ||
import { getWebSockURL } from './utils' | ||
import ReconnectingWebSocket from "reconnecting-websocket" | ||
import { validate } from "uuid" | ||
import { getWebSockURL } from "./utils" | ||
|
||
export class WebsocketClient { | ||
// wsType is either 'historicalStatus' or 'service' | ||
// pageType for 'service's are either 'camera', 'channel' or 'nightreport' | ||
constructor () { | ||
this.clientID = null | ||
this.ws = new ReconnectingWebSocket(getWebSockURL('ws/data')) | ||
constructor() { | ||
this.connectionID = null | ||
this.latestDataBySubscription = new Map() // Store latest data per subscription | ||
this.ws = new ReconnectingWebSocket(getWebSockURL("ws/data")) | ||
this.ws.onmessage = this.handleMessage.bind(this) | ||
this.ws.onclose = this.handleClose.bind(this) | ||
this.subscriptions = new Map() // To store multiple subscriptions | ||
} | ||
|
||
subscribe (wsType, pageType = null, location = null, camera = null, channel = null) { | ||
const pageID = [location, camera, channel].filter((el) => el).join('/') | ||
this.initMessage = this.#getInitMessage(wsType, pageType, pageID) | ||
this.wsEventName = this.#getWSEventName(wsType, pageType) | ||
// If clientID exists, send the initial message to subscribe to the new service | ||
if (this.clientID) { | ||
this.sendInitialMessage() | ||
subscribe( | ||
subscriptionType, | ||
servicePageType = null, | ||
location = null, | ||
camera = null, | ||
channel = null | ||
) { | ||
const pageID = [location, camera, channel].filter((el) => el).join("/") | ||
const subscriptionPayload = this.#getSubscriptionPayload( | ||
subscriptionType, | ||
servicePageType, | ||
pageID | ||
) | ||
const eventType = this.#getSubscriptionEventType( | ||
subscriptionType, | ||
servicePageType | ||
) | ||
|
||
// Add the subscription to the Map | ||
this.subscriptions.set(eventType, { subscriptionPayload }) | ||
|
||
if (this.connectionID) { | ||
this.sendSubscriptionMessages() | ||
} | ||
} | ||
|
||
#getInitMessage (wsType, pageType, pageID) { | ||
let messageJson | ||
if (wsType === 'historicalStatus') { | ||
messageJson = { messageType: wsType } | ||
#getSubscriptionPayload(subscriptionType, servicePageType, pageID) { | ||
let payload | ||
if (subscriptionType === "historicalStatus") { | ||
payload = { messageType: subscriptionType } | ||
} else { | ||
const message = [pageType, pageID].join(' ') | ||
messageJson = { messageType: 'service', message } | ||
const message = [servicePageType, pageID].join(" ") | ||
payload = { messageType: "service", message } | ||
} | ||
return messageJson | ||
return payload | ||
} | ||
|
||
#getWSEventName (wsType, pageType) { | ||
let eventName | ||
if (wsType === 'historicalStatus') { | ||
eventName = wsType | ||
} else { | ||
eventName = pageType | ||
} | ||
return eventName | ||
#getSubscriptionEventType(subscriptionType, servicePageType) { | ||
return subscriptionType === "historicalStatus" | ||
? subscriptionType | ||
: servicePageType | ||
} | ||
|
||
handleClose (e) { | ||
console.log('Lost services websocket connection. Retrying') | ||
handleClose(e) { | ||
console.log("Lost services websocket connection. Retrying") | ||
} | ||
|
||
handleMessage (messageEvent) { | ||
handleMessage(messageEvent) { | ||
console.debug(messageEvent) | ||
if (!this.clientID) { | ||
const id = this.setClientID(messageEvent.data) | ||
if (!this.connectionID) { | ||
const id = this.setConnectionID(messageEvent.data) | ||
if (id) { | ||
this.sendInitialMessage() | ||
this.sendSubscriptionMessages() | ||
return | ||
} | ||
} | ||
|
||
let data | ||
try { | ||
data = JSON.parse(messageEvent.data) | ||
// if the connection closes in the meantime | ||
// the server might send a new client ID | ||
// so catch this in case | ||
} catch (error) { | ||
const valid = this.setClientID(messageEvent.data) | ||
const valid = this.setConnectionID(messageEvent.data) | ||
if (valid) { | ||
this.sendInitialMessage() | ||
this.sendSubscriptionMessages() | ||
return | ||
} else { | ||
console.debug('Couldn\'t parse message:', messageEvent.data) | ||
console.debug("Couldn't parse message:", messageEvent.data) | ||
} | ||
} | ||
if (!data.dataType || !Object.hasOwn(data, 'payload')) { | ||
|
||
if (!data.dataType || !Object.hasOwn(data, "payload")) { | ||
return | ||
} | ||
const detail = { | ||
dataType: data.dataType, | ||
data: data.payload, | ||
datestamp: data.datestamp | ||
|
||
// Store the latest data for this specific subscription | ||
const subscription = this.subscriptions.get(data.dataType) | ||
if (subscription) { | ||
const detail = { | ||
dataType: data.dataType, | ||
data: data.payload, | ||
datestamp: data.datestamp, | ||
} | ||
this.latestDataBySubscription.set(data.dataType, detail) // Store the latest data by subscription type | ||
window.dispatchEvent(new CustomEvent(data.dataType, { detail })) | ||
} | ||
window.dispatchEvent(new CustomEvent(this.wsEventName, { detail })) | ||
} | ||
|
||
setClientID (messageData) { | ||
setConnectionID(messageData) { | ||
const id = messageData | ||
if (validate(id)) { | ||
this.clientID = id | ||
console.debug(`Received client ID: ${id}`) | ||
this.connectionID = id | ||
console.debug(`Received connection ID: ${id}`) | ||
return id | ||
} | ||
return null | ||
} | ||
|
||
sendInitialMessage () { | ||
const message = this.initMessage | ||
message.clientID = this.clientID | ||
this.ws.send(JSON.stringify(message)) | ||
sendSubscriptionMessages() { | ||
for (const { subscriptionPayload } of this.subscriptions.values()) { | ||
const message = { | ||
...subscriptionPayload, | ||
connectionID: this.connectionID, | ||
} | ||
this.ws.send(JSON.stringify(message)) | ||
} | ||
} | ||
|
||
// New method to retrieve the latest detail for a specific subscription | ||
getLatestDataForSubscription(subscriptionType) { | ||
return this.latestDataBySubscription.get(subscriptionType) || null | ||
} | ||
} |