From a2a0d3741475b3e2c4f2d27142e8b1a7852db5bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Iv=C3=A1n=20Vieitez=20Parra?= <3857362+corrideat@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:34:36 +0000 Subject: [PATCH] Server push events --- backend/pubsub.js | 23 +++++++--- backend/push.js | 40 ++++++++++++++++- backend/server.js | 52 ++++++++++++++++++++++ backend/vapid.js | 6 ++- frontend/controller/service-worker.js | 8 ++-- frontend/controller/serviceworkers/push.js | 11 +++-- 6 files changed, 122 insertions(+), 18 deletions(-) diff --git a/backend/pubsub.js b/backend/pubsub.js index 11d317fc6d..984dadaedf 100644 --- a/backend/pubsub.js +++ b/backend/pubsub.js @@ -21,6 +21,7 @@ import type { } from '~/shared/pubsub.js' import type { JSONType, JSONObject } from '~/shared/types.js' +import { postEvent } from './push.js' const { bold } = require('chalk') const WebSocket = require('ws') @@ -105,7 +106,7 @@ export function createServer (httpServer: Object, options?: Object = {}): Object server.channels = new Set() server.customServerEventHandlers = { ...options.serverHandlers } server.customSocketEventHandlers = { ...options.socketHandlers } - server.messageHandlers = { ...defaultMessageHandlers, ...options.messageHandlers } + server.customMessageHandlers = { ...options.messageHandlers } server.pingIntervalID = undefined server.subscribersByChannelID = Object.create(null) server.pushSubscriptions = Object.create(null) @@ -231,14 +232,16 @@ const defaultSocketEventHandlers = { } // The socket can be marked as active since it just received a message. socket.activeSinceLastPing = true - const handler = server.messageHandlers[msg.type] + const defaultHandler = defaultMessageHandlers[msg.type] + const customHandler = server.customMessageHandlers[msg.type] - if (handler) { + if (defaultHandler || customHandler) { try { - handler.call(socket, msg) + defaultHandler?.call(socket, msg) + customHandler?.call(socket, msg) } catch (error) { // Log the error message and stack trace but do not send it to the client. - log.error(error, 'onMesage') + log.error(error, 'onMessage') server.rejectMessageAndTerminateSocket(msg, socket) } } else { @@ -323,8 +326,16 @@ const publicMethods = { const server = this for (const client of to || server.clients) { + const msg = typeof message === 'string' ? message : JSON.stringify(message) + if (client.endpoint) { + console.error('@@@@POSTING PUSH EVENT') + postEvent(client, msg).catch(e => { + console.error(e, 'Error posting push notification') + }) + continue + } if (client.readyState === WebSocket.OPEN && client !== except) { - client.send(typeof message === 'string' ? message : JSON.stringify(message)) + client.send(msg) } } }, diff --git a/backend/push.js b/backend/push.js index 1639e22a83..1f020247ad 100644 --- a/backend/push.js +++ b/backend/push.js @@ -100,6 +100,12 @@ export const subscriptionInfoWrapper = (subcriptionId: string, subscriptionInfo: return resultPromise } })() + }, + 'sockets': { + value: new Set() + }, + 'subscriptions': { + value: new Set() } }) @@ -115,15 +121,47 @@ export const pushServerActionhandlers: any = { }, async [PUSH_SERVER_ACTION_TYPE.STORE_SUBSCRIPTION] (payload) { const socket = this + const { server } = socket const subscription = payload const subscriptionId = await getSubscriptionId(subscription) - socket.server.pushSubscriptions[subscriptionId] = subscriptionInfoWrapper(subscriptionId, subscription) + if (!server.pushSubscriptions[subscriptionId]) { + server.pushSubscriptions[subscriptionId] = subscriptionInfoWrapper(subscriptionId, subscription) + } else { + if (server.pushSubscriptions[subscriptionId].sockets.size === 0) { + server.pushSubscriptions[subscriptionId].subscriptions.forEach((channelID) => { + if (!server.subscribersByChannelID[channelID]) return + server.subscribersByChannelID[channelID].delete(server.pushSubscriptions[subscriptionId]) + }) + } + } + if (socket.pushSubscriptionId) { + if (socket.pushSubscriptionId === subscriptionId) return + const oldSubscriptionId = socket.pushSubscriptionId + server.pushSubscriptions[oldSubscriptionId].sockets.delete(socket) + if (server.pushSubscriptions[oldSubscriptionId].sockets.size === 0) { + server.pushSubscriptions[oldSubscriptionId].subscriptions.forEach((channelID) => { + if (!server.subscribersByChannelID[channelID]) { + server.subscribersByChannelID[channelID] = new Set() + } + server.subscribersByChannelID[channelID].add(server.pushSubscriptions[oldSubscriptionId]) + }) + } + } + socket.pushSubscriptionId = subscriptionId + server.pushSubscriptions[subscriptionId].subscriptions.forEach((channelID) => { + server.subscribersByChannelID?.[channelID].delete(server.pushSubscriptions[subscriptionId]) + }) + server.pushSubscriptions[subscriptionId].sockets.add(socket) + socket.subscriptions?.forEach(channelID => { + server.pushSubscriptions[subscriptionId].subscriptions.add(channelID) + }) }, [PUSH_SERVER_ACTION_TYPE.DELETE_SUBSCRIPTION] (payload) { const socket = this const subscriptionId = JSON.parse(payload) + // TODO delete socket.server.pushSubscriptions[subscriptionId] } } diff --git a/backend/server.js b/backend/server.js index b20cb18e52..5a2a8f1cd6 100644 --- a/backend/server.js +++ b/backend/server.js @@ -21,6 +21,7 @@ import { pushServerActionhandlers } from './push.js' // $FlowFixMe[cannot-resolve-module] import { webcrypto } from 'node:crypto' import { GIMessage } from '~/shared/domains/chelonia/GIMessage.js' +import type { SubMessage, UnsubMessage } from '~/shared/pubsub.js' const { CONTRACTS_VERSION, GI_VERSION } = process.env @@ -206,6 +207,33 @@ sbp('okTurtles.data/set', PUBSUB_INSTANCE, createServer(hapi.listener, { socket.send(createNotification(NOTIFICATION_TYPE.VERSION_INFO, versionInfo)) } }, + socketHandlers: { + close () { + const socket = this + const { server } = this + + const subscriptionId = socket.pushSubscriptionId + + if (!subscriptionId) return + delete socket.pushSubscriptionId + + if (!server.pushSubscriptions[subscriptionId]) return + + server.pushSubscriptions[subscriptionId].sockets.delete(socket) + delete socket.pushSubscriptionId + + if (server.pushSubscriptions[subscriptionId].sockets.size === 0) return + server.pushSubscriptions[subscriptionId].sockets.delete(socket) + if (server.pushSubscriptions[subscriptionId].sockets.size === 0) { + server.pushSubscriptions[subscriptionId].subscriptions.forEach((channelID) => { + if (!server.subscribersByChannelID[channelID]) { + server.subscribersByChannelID[channelID] = new Set() + } + server.subscribersByChannelID[channelID].add(server.pushSubscriptions[subscriptionId]) + }) + } + } + }, messageHandlers: { [REQUEST_TYPE.PUSH_ACTION]: async function ({ data }) { const socket = this @@ -229,6 +257,30 @@ sbp('okTurtles.data/set', PUBSUB_INSTANCE, createServer(hapi.listener, { } else { socket.send(createPushErrorResponse({ message: `No handler for the '${action}' action` })) } + }, + [NOTIFICATION_TYPE.SUB] ({ channelID }: SubMessage) { + const socket = this + const { server } = this + + if (!socket.pushSubscriptionId) return + if (!server.pushSubscriptions[socket.pushSubscriptionId]) { + delete socket.pushSubscriptionId + return + } + + server.pushSubscriptions[socket.pushSubscriptionId].subscriptions.add(channelID) + }, + [NOTIFICATION_TYPE.UNSUB] ({ channelID }: UnsubMessage) { + const socket = this + const { server } = this + + if (!socket.pushSubscriptionId) return + if (!server.pushSubscriptions[socket.pushSubscriptionId]) { + delete socket.pushSubscriptionId + return + } + + server.pushSubscriptions[socket.pushSubscriptionId].subscriptions.delete(channelID) } } })) diff --git a/backend/vapid.js b/backend/vapid.js index 79e86b22e6..37903c6dbb 100644 --- a/backend/vapid.js +++ b/backend/vapid.js @@ -52,7 +52,7 @@ export const initVapid = async () => { vapidPublicKey = vapidKeyPair[1] } -const generateJwt = async (endpoint) => { +const generateJwt = async (endpoint: URL): Promise => { const privateKey = await crypto.subtle.importKey( 'jwk', vapidPrivateKey, @@ -67,6 +67,7 @@ const generateJwt = async (endpoint) => { const header = Buffer.from(JSON.stringify( Object.fromEntries([['typ', 'JWT'], ['alg', 'ES256']]) + // $FlowFixMe[incompatible-call] )).toString('base64url') const body = Buffer.from(JSON.stringify( Object.fromEntries([ @@ -76,6 +77,7 @@ const generateJwt = async (endpoint) => { ['nbf', now - 60], ['sub', vapid.VAPID_EMAIL] ]) + // $FlowFixMe[incompatible-call] )).toString('base64url') const signature = Buffer.from( @@ -91,7 +93,7 @@ const generateJwt = async (endpoint) => { export const getVapidPublicKey = (): string => vapidPublicKey -export const vapidAuthorization = async (endpoint: string): Promise => { +export const vapidAuthorization = async (endpoint: URL): Promise => { const jwt = await generateJwt(endpoint) return `vapid t=${jwt}, k=${vapidPublicKey}` } diff --git a/frontend/controller/service-worker.js b/frontend/controller/service-worker.js index f3d08b3c36..c5e5466e40 100644 --- a/frontend/controller/service-worker.js +++ b/frontend/controller/service-worker.js @@ -91,13 +91,10 @@ sbp('sbp/selectors/register', { return } - const pubsub = sbp('okTurtles.data/get', PUBSUB_INSTANCE) - if (!pubsub) return // TODO: This needs to be moved into the service worker - // proper. pubsub will be undefined in this context. const existingSubscription = await registration.pushManager.getSubscription().then((subscription) => { if ( !subscription || - (subscription.expirationTime != null && + (subscription.expirationTime != null && subscription.expirationTime <= Date.now()) ) { console.info( @@ -111,9 +108,10 @@ sbp('sbp/selectors/register', { }) // TODO: Consider throwing an exception here - if (!existingSubscription) return + if (!existingSubscription) return false await sbp('push/reportExistingSubscription', existingSubscription.toJSON()) + return true }) }, 'service-worker/update': async function () { diff --git a/frontend/controller/serviceworkers/push.js b/frontend/controller/serviceworkers/push.js index 61b8ea4b31..b3815570c5 100644 --- a/frontend/controller/serviceworkers/push.js +++ b/frontend/controller/serviceworkers/push.js @@ -1,6 +1,6 @@ import { PUBSUB_INSTANCE } from '@controller/instance-keys.js' import sbp from '@sbp/sbp' -import { PUSH_SERVER_ACTION_TYPE, REQUEST_TYPE, createMessage } from '~/shared/pubsub.js' +import { NOTIFICATION_TYPE, PUSH_SERVER_ACTION_TYPE, REQUEST_TYPE, createMessage } from '~/shared/pubsub.js' export default (sbp('sbp/selectors/register', { 'push/getSubscriptionOptions': (() => { @@ -48,6 +48,7 @@ export default (sbp('sbp/selectors/register', { return result } })(), + // eslint-disable-next-line require-await 'push/reportExistingSubscription': async (subscriptionInfo) => { const pubsub = sbp('okTurtles.data/get', PUBSUB_INSTANCE) if (!pubsub) throw new Error('Missing pubsub instance') @@ -57,7 +58,7 @@ export default (sbp('sbp/selectors/register', { throw new Error('WebSocket connection is not open') } - await pubsub.socket.send(createMessage( + pubsub.socket.send(createMessage( REQUEST_TYPE.PUSH_ACTION, { action: PUSH_SERVER_ACTION_TYPE.STORE_SUBSCRIPTION, payload: subscriptionInfo } )) @@ -66,8 +67,10 @@ export default (sbp('sbp/selectors/register', { self.addEventListener('push', function (event) { // PushEvent reference: https://developer.mozilla.org/en-US/docs/Web/API/PushEvent - const data = event.data.text() - event.waitUntil(sbp('chelonia/handleEvent', data)) + const data = event.data.json() + if (data.type === NOTIFICATION_TYPE.ENTRY) { + event.waitUntil(sbp('chelonia/handleEvent', data.data)) + } }) self.addEventListener('pushsubscriptionchange', async function (event) {