Skip to content

Commit

Permalink
fix(crux): ws session expire unsub handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
robot9706 committed Sep 14, 2023
1 parent ab2711d commit 102ba04
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 26 deletions.
4 changes: 3 additions & 1 deletion web/crux/src/app/config.bundle/config.bundle.ws.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,15 @@ export default class ConfigBundleWebSocketGateway {
@SocketClient() client: WsClient,
@ConfigBundleId() configBundleId: string,
@SocketSubscription() subscription: WsSubscription,
): Promise<void> {
): Promise<boolean> {
const data = await this.service.onEditorLeft(configBundleId, client.token)
const message: WsMessage<EditorLeftMessage> = {
type: WS_TYPE_EDITOR_LEFT,
data,
}
subscription.sendToAllExcept(client, message)

return true
}

@SubscribeMessage(WS_TYPE_PATCH_CONFIG_BUNDLE)
Expand Down
4 changes: 3 additions & 1 deletion web/crux/src/app/deploy/deploy.ws.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export default class DeployWebSocketGateway {
@SocketClient() client: WsClient,
@DeploymentId() deploymentId: string,
@SocketSubscription() subscription: WsSubscription,
): Promise<void> {
): Promise<boolean> {
const data = await this.service.onEditorLeft(deploymentId, client.token)
const message: WsMessage<EditorLeftMessage> = {
type: WS_TYPE_EDITOR_LEFT,
Expand All @@ -150,6 +150,8 @@ export default class DeployWebSocketGateway {
this.deploymentEventCompleters.get(key).next(undefined)
this.deploymentEventCompleters.delete(key)
}

return true
}

@AuditLogLevel('disabled')
Expand Down
9 changes: 8 additions & 1 deletion web/crux/src/app/token/jwt-auth.guard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Request as ExpressRequest } from 'express'
import { RequestAuthenticationData } from 'src/domain/identity'
import { CruxUnauthorizedException } from 'src/exception/crux-exception'
import KratosService, { hasKratosSession } from 'src/services/kratos.service'
import { WsClient } from 'src/websockets/common'
import { WS_TYPE_UNSUBSCRIBE, WsClient } from 'src/websockets/common'

export type AuthStrategyType = 'user-token' | 'deploy-token' | 'disabled'
export const AUTH_STRATEGY = 'auth-strategy'
Expand Down Expand Up @@ -109,6 +109,13 @@ export default class JwtAuthGuard extends AuthGuard('jwt') {

private canActivateWs(context: ExecutionContext): boolean {
const client: WsClient = context.switchToWs().getClient()
if (client.disconnecting) {
// NOTE(@robot9706): When a client is disconnecting disallow any handlers
// except WsUnsubscribe for cleanup
const message = this.reflector.get('message', context.getHandler())
return message === WS_TYPE_UNSUBSCRIBE
}

const req = client.connectionRequest as AuthorizedHttpRequest

const now = new Date().getTime()
Expand Down
4 changes: 3 additions & 1 deletion web/crux/src/app/version/version.ws.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,15 @@ export default class VersionWebSocketGateway {
@SocketClient() client: WsClient,
@VersionId() versionId: string,
@SocketSubscription() subscription: WsSubscription,
): Promise<void> {
): Promise<boolean> {
const data = await this.service.onEditorLeft(versionId, client.token)
const message: WsMessage<EditorLeftMessage> = {
type: WS_TYPE_EDITOR_LEFT,
data,
}
subscription.sendToAllExcept(client, message)

return true
}

@AuditLogLevel('disabled')
Expand Down
4 changes: 4 additions & 0 deletions web/crux/src/websockets/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ export type WsClient = WebSocket & {
connectionRequest: AuthorizedHttpRequest
sendWsMessage: WsSendClientMessage
subscriptions: Map<string, WsSubscription>

// NOTE(@robot9706): Used by jwt-auth.guard WS strategy to be able to call
// unsubscribe methods when the session is invalid (for cleanup)
disconnecting: boolean
}

export type WsTransform = (data: any | Promise<any> | Observable<any>) => Observable<any>
Expand Down
15 changes: 11 additions & 4 deletions web/crux/src/websockets/dyo.ws.adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ export default class DyoWsAdapter extends AbstractWsAdapter {
if (message.type === WS_TYPE_SUBSCRIBE) {
res = await route.onSubscribe(client, match, message)
} else if (message.type === WS_TYPE_UNSUBSCRIBE) {
res = route.onUnsubscribe(client, match, message)
res = await route.onUnsubscribe(client, match, message)
} else {
const err = new Error(`Invalid subscription type ${message.type}`)
this.logger.verbose(err)
Expand All @@ -280,8 +280,9 @@ export default class DyoWsAdapter extends AbstractWsAdapter {
client.token = uuid()
client.connectionRequest = req as AuthorizedHttpRequest
client.subscriptions = new Map()
client.disconnecting = false
client.sendWsMessage = msg => {
if (!msg || client.readyState !== WebSocketReadyState.OPEN_STATE) {
if (!msg || client.readyState !== WebSocketReadyState.OPEN_STATE || client.disconnecting) {
return
}

Expand All @@ -298,11 +299,17 @@ export default class DyoWsAdapter extends AbstractWsAdapter {
this.logger.log(`Connected ${client.token} clients: ${this.server?.clients?.size}`)
}

private onClientDisconnect(client: WsClient) {
private async onClientDisconnect(client: WsClient) {
client.disconnecting = true

this.logger.log(`Disconnected ${client.token} clients: ${this.server?.clients?.size}`)
WsMetrics.connections().dec()

this.routes.forEach(it => it.onClientDisconnect(client))
await Promise.all(this.routes.map(it => it.onClientDisconnect(client)))

if (client?.subscriptions?.size > 0) {
this.logger.warn(`Client ${client.token} failed to cleanup all subscriptions!`)
}

client?.setup?.onClientDisconnect()
}
Expand Down
25 changes: 15 additions & 10 deletions web/crux/src/websockets/namespace.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from '@nestjs/common'
import { EMPTY, Observable, Subject, filter, first, map, mergeWith, of, takeUntil } from 'rxjs'
import { EMPTY, Observable, Subject, filter, first, firstValueFrom, map, mergeWith, of, takeUntil } from 'rxjs'
import {
SubscriptionMessage,
WS_TYPE_SUBBED,
Expand Down Expand Up @@ -87,7 +87,7 @@ export default class WsNamespace implements WsSubscription {
return of(res).pipe(first())
}

onUnsubscribe(client: WsClient, message: WsMessage<SubscriptionMessage> | null): UnsubcribeResult {
async onUnsubscribe(client: WsClient, message: WsMessage<SubscriptionMessage> | null): Promise<UnsubcribeResult> {
const { token } = client

const resources = this.clients.get(token)
Expand All @@ -112,17 +112,22 @@ export default class WsNamespace implements WsSubscription {

const { unsubscribe, transform, completer } = resources

completer.next(undefined)
this.clients.delete(token)

if (unsubscribe) {
transform(unsubscribe(message))
.pipe(first())
.subscribe(() => client.subscriptions.delete(this.path))
} else {
client.subscriptions.delete(this.path)
const unsubscribeResult = await firstValueFrom(transform(unsubscribe(message)))
if (!unsubscribeResult) {
this.logger.warn(`${this.path} @WsUnsubscribe returned undefined`)
return {
res: null,
shouldRemove: this.clients.size < 1,
}
}
}

client.subscriptions.delete(this.path)

completer.next(undefined)
this.clients.delete(token)

this.logger.verbose(`${token} unsubscribed`)

return {
Expand Down
16 changes: 8 additions & 8 deletions web/crux/src/websockets/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ export default class WsRoute {
return ns.onSubscribe(client, callbacks, message)
}

onUnsubscribe(
async onUnsubscribe(
client: WsClient,
match: WsRouteMatch,
message: WsMessage<SubscriptionMessage>,
): Observable<WsMessage<SubscriptionMessage>> {
): Promise<Observable<WsMessage<SubscriptionMessage>>> {
const { path } = match

const res = this.removeClientFromNamespace(client, path, message)
const res = await this.removeClientFromNamespace(client, path, message)

return res ? of(res) : EMPTY
}
Expand Down Expand Up @@ -169,10 +169,10 @@ export default class WsRoute {
})
}

onClientDisconnect(client: WsClient) {
async onClientDisconnect(client: WsClient): Promise<void> {
const subscriptionPaths = Array.from(client.subscriptions.keys())

Array.from(subscriptionPaths).forEach(it => this.removeClientFromNamespace(client, it, null))
await Promise.all(Array.from(subscriptionPaths).map(it => this.removeClientFromNamespace(client, it, null)))

this.callbacks.delete(client.token)
}
Expand All @@ -193,17 +193,17 @@ export default class WsRoute {
return ns
}

private removeClientFromNamespace(
private async removeClientFromNamespace(
client: WsClient,
namespacePath: string,
message: WsMessage<SubscriptionMessage> | null,
): WsMessage<SubscriptionMessage> {
): Promise<WsMessage<SubscriptionMessage>> {
const ns = this.namespaces.get(namespacePath)
if (!ns) {
return null
}

const { res, shouldRemove } = ns.onUnsubscribe(client, message)
const { res, shouldRemove } = await ns.onUnsubscribe(client, message)
if (shouldRemove) {
this.namespaces.delete(namespacePath)
this.logger.verbose(`Namespace deleted ${namespacePath}`)
Expand Down

0 comments on commit 102ba04

Please sign in to comment.