Skip to content

Commit

Permalink
Server push events
Browse files Browse the repository at this point in the history
  • Loading branch information
corrideat committed Nov 6, 2024
1 parent 74b2dfa commit a2a0d37
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 18 deletions.
23 changes: 17 additions & 6 deletions backend/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import type {
} from '~/shared/pubsub.js'

import type { JSONType, JSONObject } from '~/shared/types.js'
import { postEvent } from './push.js'

const { bold } = require('chalk')
const WebSocket = require('ws')
Expand Down Expand Up @@ -105,7 +106,7 @@ export function createServer (httpServer: Object, options?: Object = {}): Object
server.channels = new Set()
server.customServerEventHandlers = { ...options.serverHandlers }
server.customSocketEventHandlers = { ...options.socketHandlers }
server.messageHandlers = { ...defaultMessageHandlers, ...options.messageHandlers }
server.customMessageHandlers = { ...options.messageHandlers }
server.pingIntervalID = undefined
server.subscribersByChannelID = Object.create(null)
server.pushSubscriptions = Object.create(null)
Expand Down Expand Up @@ -231,14 +232,16 @@ const defaultSocketEventHandlers = {
}
// The socket can be marked as active since it just received a message.
socket.activeSinceLastPing = true
const handler = server.messageHandlers[msg.type]
const defaultHandler = defaultMessageHandlers[msg.type]
const customHandler = server.customMessageHandlers[msg.type]

if (handler) {
if (defaultHandler || customHandler) {
try {
handler.call(socket, msg)
defaultHandler?.call(socket, msg)
customHandler?.call(socket, msg)
} catch (error) {
// Log the error message and stack trace but do not send it to the client.
log.error(error, 'onMesage')
log.error(error, 'onMessage')
server.rejectMessageAndTerminateSocket(msg, socket)
}
} else {
Expand Down Expand Up @@ -323,8 +326,16 @@ const publicMethods = {
const server = this

for (const client of to || server.clients) {
const msg = typeof message === 'string' ? message : JSON.stringify(message)
if (client.endpoint) {
console.error('@@@@POSTING PUSH EVENT')
postEvent(client, msg).catch(e => {
console.error(e, 'Error posting push notification')
})
continue
}
if (client.readyState === WebSocket.OPEN && client !== except) {
client.send(typeof message === 'string' ? message : JSON.stringify(message))
client.send(msg)
}
}
},
Expand Down
40 changes: 39 additions & 1 deletion backend/push.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ export const subscriptionInfoWrapper = (subcriptionId: string, subscriptionInfo:
return resultPromise
}
})()
},
'sockets': {
value: new Set()
},
'subscriptions': {
value: new Set()
}
})

Expand All @@ -115,15 +121,47 @@ export const pushServerActionhandlers: any = {
},
async [PUSH_SERVER_ACTION_TYPE.STORE_SUBSCRIPTION] (payload) {
const socket = this
const { server } = socket
const subscription = payload
const subscriptionId = await getSubscriptionId(subscription)

socket.server.pushSubscriptions[subscriptionId] = subscriptionInfoWrapper(subscriptionId, subscription)
if (!server.pushSubscriptions[subscriptionId]) {
server.pushSubscriptions[subscriptionId] = subscriptionInfoWrapper(subscriptionId, subscription)
} else {
if (server.pushSubscriptions[subscriptionId].sockets.size === 0) {
server.pushSubscriptions[subscriptionId].subscriptions.forEach((channelID) => {
if (!server.subscribersByChannelID[channelID]) return
server.subscribersByChannelID[channelID].delete(server.pushSubscriptions[subscriptionId])
})
}
}
if (socket.pushSubscriptionId) {
if (socket.pushSubscriptionId === subscriptionId) return
const oldSubscriptionId = socket.pushSubscriptionId
server.pushSubscriptions[oldSubscriptionId].sockets.delete(socket)
if (server.pushSubscriptions[oldSubscriptionId].sockets.size === 0) {
server.pushSubscriptions[oldSubscriptionId].subscriptions.forEach((channelID) => {
if (!server.subscribersByChannelID[channelID]) {
server.subscribersByChannelID[channelID] = new Set()
}
server.subscribersByChannelID[channelID].add(server.pushSubscriptions[oldSubscriptionId])
})
}
}
socket.pushSubscriptionId = subscriptionId
server.pushSubscriptions[subscriptionId].subscriptions.forEach((channelID) => {
server.subscribersByChannelID?.[channelID].delete(server.pushSubscriptions[subscriptionId])
})
server.pushSubscriptions[subscriptionId].sockets.add(socket)
socket.subscriptions?.forEach(channelID => {
server.pushSubscriptions[subscriptionId].subscriptions.add(channelID)
})
},
[PUSH_SERVER_ACTION_TYPE.DELETE_SUBSCRIPTION] (payload) {
const socket = this
const subscriptionId = JSON.parse(payload)

// TODO
delete socket.server.pushSubscriptions[subscriptionId]
}
}
Expand Down
52 changes: 52 additions & 0 deletions backend/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { pushServerActionhandlers } from './push.js'
// $FlowFixMe[cannot-resolve-module]
import { webcrypto } from 'node:crypto'
import { GIMessage } from '~/shared/domains/chelonia/GIMessage.js'
import type { SubMessage, UnsubMessage } from '~/shared/pubsub.js'

const { CONTRACTS_VERSION, GI_VERSION } = process.env

Expand Down Expand Up @@ -206,6 +207,33 @@ sbp('okTurtles.data/set', PUBSUB_INSTANCE, createServer(hapi.listener, {
socket.send(createNotification(NOTIFICATION_TYPE.VERSION_INFO, versionInfo))
}
},
socketHandlers: {
close () {
const socket = this
const { server } = this

const subscriptionId = socket.pushSubscriptionId

if (!subscriptionId) return
delete socket.pushSubscriptionId

if (!server.pushSubscriptions[subscriptionId]) return

server.pushSubscriptions[subscriptionId].sockets.delete(socket)
delete socket.pushSubscriptionId

if (server.pushSubscriptions[subscriptionId].sockets.size === 0) return
server.pushSubscriptions[subscriptionId].sockets.delete(socket)
if (server.pushSubscriptions[subscriptionId].sockets.size === 0) {
server.pushSubscriptions[subscriptionId].subscriptions.forEach((channelID) => {
if (!server.subscribersByChannelID[channelID]) {
server.subscribersByChannelID[channelID] = new Set()
}
server.subscribersByChannelID[channelID].add(server.pushSubscriptions[subscriptionId])
})
}
}
},
messageHandlers: {
[REQUEST_TYPE.PUSH_ACTION]: async function ({ data }) {
const socket = this
Expand All @@ -229,6 +257,30 @@ sbp('okTurtles.data/set', PUBSUB_INSTANCE, createServer(hapi.listener, {
} else {
socket.send(createPushErrorResponse({ message: `No handler for the '${action}' action` }))
}
},
[NOTIFICATION_TYPE.SUB] ({ channelID }: SubMessage) {
const socket = this
const { server } = this

if (!socket.pushSubscriptionId) return
if (!server.pushSubscriptions[socket.pushSubscriptionId]) {
delete socket.pushSubscriptionId
return
}

server.pushSubscriptions[socket.pushSubscriptionId].subscriptions.add(channelID)
},
[NOTIFICATION_TYPE.UNSUB] ({ channelID }: UnsubMessage) {
const socket = this
const { server } = this

if (!socket.pushSubscriptionId) return
if (!server.pushSubscriptions[socket.pushSubscriptionId]) {
delete socket.pushSubscriptionId
return
}

server.pushSubscriptions[socket.pushSubscriptionId].subscriptions.delete(channelID)
}
}
}))
Expand Down
6 changes: 4 additions & 2 deletions backend/vapid.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export const initVapid = async () => {
vapidPublicKey = vapidKeyPair[1]
}

const generateJwt = async (endpoint) => {
const generateJwt = async (endpoint: URL): Promise<string> => {
const privateKey = await crypto.subtle.importKey(
'jwk',
vapidPrivateKey,
Expand All @@ -67,6 +67,7 @@ const generateJwt = async (endpoint) => {

const header = Buffer.from(JSON.stringify(
Object.fromEntries([['typ', 'JWT'], ['alg', 'ES256']])
// $FlowFixMe[incompatible-call]
)).toString('base64url')
const body = Buffer.from(JSON.stringify(
Object.fromEntries([
Expand All @@ -76,6 +77,7 @@ const generateJwt = async (endpoint) => {
['nbf', now - 60],
['sub', vapid.VAPID_EMAIL]
])
// $FlowFixMe[incompatible-call]
)).toString('base64url')

const signature = Buffer.from(
Expand All @@ -91,7 +93,7 @@ const generateJwt = async (endpoint) => {

export const getVapidPublicKey = (): string => vapidPublicKey

export const vapidAuthorization = async (endpoint: string): Promise<string> => {
export const vapidAuthorization = async (endpoint: URL): Promise<string> => {
const jwt = await generateJwt(endpoint)
return `vapid t=${jwt}, k=${vapidPublicKey}`
}
8 changes: 3 additions & 5 deletions frontend/controller/service-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,10 @@ sbp('sbp/selectors/register', {
return
}

const pubsub = sbp('okTurtles.data/get', PUBSUB_INSTANCE)
if (!pubsub) return // TODO: This needs to be moved into the service worker
// proper. pubsub will be undefined in this context.
const existingSubscription = await registration.pushManager.getSubscription().then((subscription) => {
if (
!subscription ||
(subscription.expirationTime != null &&
(subscription.expirationTime != null &&
subscription.expirationTime <= Date.now())
) {
console.info(
Expand All @@ -111,9 +108,10 @@ sbp('sbp/selectors/register', {
})

// TODO: Consider throwing an exception here
if (!existingSubscription) return
if (!existingSubscription) return false

await sbp('push/reportExistingSubscription', existingSubscription.toJSON())
return true
})
},
'service-worker/update': async function () {
Expand Down
11 changes: 7 additions & 4 deletions frontend/controller/serviceworkers/push.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { PUBSUB_INSTANCE } from '@controller/instance-keys.js'
import sbp from '@sbp/sbp'
import { PUSH_SERVER_ACTION_TYPE, REQUEST_TYPE, createMessage } from '~/shared/pubsub.js'
import { NOTIFICATION_TYPE, PUSH_SERVER_ACTION_TYPE, REQUEST_TYPE, createMessage } from '~/shared/pubsub.js'

export default (sbp('sbp/selectors/register', {
'push/getSubscriptionOptions': (() => {
Expand Down Expand Up @@ -48,6 +48,7 @@ export default (sbp('sbp/selectors/register', {
return result
}
})(),
// eslint-disable-next-line require-await
'push/reportExistingSubscription': async (subscriptionInfo) => {
const pubsub = sbp('okTurtles.data/get', PUBSUB_INSTANCE)
if (!pubsub) throw new Error('Missing pubsub instance')
Expand All @@ -57,7 +58,7 @@ export default (sbp('sbp/selectors/register', {
throw new Error('WebSocket connection is not open')
}

await pubsub.socket.send(createMessage(
pubsub.socket.send(createMessage(
REQUEST_TYPE.PUSH_ACTION,
{ action: PUSH_SERVER_ACTION_TYPE.STORE_SUBSCRIPTION, payload: subscriptionInfo }
))
Expand All @@ -66,8 +67,10 @@ export default (sbp('sbp/selectors/register', {

self.addEventListener('push', function (event) {
// PushEvent reference: https://developer.mozilla.org/en-US/docs/Web/API/PushEvent
const data = event.data.text()
event.waitUntil(sbp('chelonia/handleEvent', data))
const data = event.data.json()
if (data.type === NOTIFICATION_TYPE.ENTRY) {
event.waitUntil(sbp('chelonia/handleEvent', data.data))
}
})

self.addEventListener('pushsubscriptionchange', async function (event) {
Expand Down

0 comments on commit a2a0d37

Please sign in to comment.