Skip to content

Commit

Permalink
fix(web): websocket close
Browse files Browse the repository at this point in the history
  • Loading branch information
robot9706 committed Sep 18, 2023
1 parent 8834e98 commit e9873e5
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 67 deletions.
2 changes: 1 addition & 1 deletion web/crux-ui/e2e/utils/test.fixture.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ test.beforeEach(async ({ page }, testInfo) => {
return
}

console.log(`[${testInfo.title}] ${type.toUpperCase()} ${it.text()}`)
console.info(`[${testInfo.title}] ${type.toUpperCase()} ${it.text()}`)
})

if (CPU_THROTTLE) {
Expand Down
11 changes: 10 additions & 1 deletion web/crux-ui/src/providers/websocket.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { defaultWsErrorHandler } from '@app/errors'
import { ROUTE_LOGIN } from '@app/routes'
import { isServerSide } from '@app/utils'
import WebSocketClient from '@app/websockets/websocket-client'
import useTranslation from 'next-translate/useTranslation'
import { useRouter } from 'next/router'
import React, { useState } from 'react'

interface WebSocketContextInterface {
Expand All @@ -14,6 +16,7 @@ export const WebSocketProvider = (props: React.PropsWithChildren<{}>) => {
const { children } = props

const { t } = useTranslation('common')
const router = useRouter()

const [wsClient] = useState(() => {
if (isServerSide()) {
Expand All @@ -23,7 +26,13 @@ export const WebSocketProvider = (props: React.PropsWithChildren<{}>) => {
const client = new WebSocketClient()

const wsErrorHandler = defaultWsErrorHandler(t)
client.setErrorHandler(msg => wsErrorHandler(msg))
client.setErrorHandler(msg => {
if (msg.status === WebSocketClient.ERROR_SESSION_EXPIRED) {
router.push(ROUTE_LOGIN)
return
}
wsErrorHandler(msg)
})

return client
})
Expand Down
19 changes: 15 additions & 4 deletions web/crux-ui/src/websockets/websocket-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import WebSocketClientEndpoint from './websocket-client-endpoint'
import WebSocketClientRoute from './websocket-client-route'

class WebSocketClient {
public static ERROR_SESSION_EXPIRED = 4000

private logger = new Logger('WebSocketClient') // need to be explicit string because of production build uglification

private socket?: WebSocket
Expand Down Expand Up @@ -91,7 +93,9 @@ class WebSocketClient {

const ws = this.socket
this.socket = null
ws.close()
if (ws.readyState === WebSocket.OPEN) {
ws.close()
}
this.destroyListeners?.call(null)
}

Expand Down Expand Up @@ -203,16 +207,23 @@ class WebSocketClient {
this.routes.forEach(it => it.onSocketOpen())
}

const onClose = () => {
const onClose = (it: CloseEvent) => {
if (!resolved) {
resolved = true
setTimeout(() => resolve(false), failTimeout)
}

this.logger.info('Disconnected')

this.routes.forEach(it => it.onSocketClose())
this.reconnect()
this.routes.forEach(route => route.onSocketClose())
if (it.code === WebSocketClient.ERROR_SESSION_EXPIRED) {
this.errorHandler({
status: WebSocketClient.ERROR_SESSION_EXPIRED,
message: it.reason,
})
} else {
this.reconnect()
}
}

const onError = ev => {
Expand Down
4 changes: 1 addition & 3 deletions web/crux/src/app/config.bundle/config.bundle.ws.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,13 @@ export default class ConfigBundleWebSocketGateway {
@SocketClient() client: WsClient,
@ConfigBundleId() configBundleId: string,
@SocketSubscription() subscription: WsSubscription,
): Promise<boolean> {
) {
const data = await this.service.onEditorLeft(configBundleId, client.token)
const message: WsMessage<EditorLeftMessage> = {
type: WS_TYPE_EDITOR_LEFT,
data,
}
subscription.sendToAllExcept(client, message)

return true
}

@SubscribeMessage(WS_TYPE_PATCH_CONFIG_BUNDLE)
Expand Down
4 changes: 1 addition & 3 deletions web/crux/src/app/deploy/deploy.ws.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export default class DeployWebSocketGateway {
@SocketClient() client: WsClient,
@DeploymentId() deploymentId: string,
@SocketSubscription() subscription: WsSubscription,
): Promise<boolean> {
) {
const data = await this.service.onEditorLeft(deploymentId, client.token)
const message: WsMessage<EditorLeftMessage> = {
type: WS_TYPE_EDITOR_LEFT,
Expand All @@ -150,8 +150,6 @@ export default class DeployWebSocketGateway {
this.deploymentEventCompleters.get(key).next(undefined)
this.deploymentEventCompleters.delete(key)
}

return true
}

@AuditLogLevel('disabled')
Expand Down
5 changes: 1 addition & 4 deletions web/crux/src/app/token/jwt-auth.guard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ export default class JwtAuthGuard extends AuthGuard('jwt') {

private async canActivateWs(context: ExecutionContext): Promise<boolean> {
const client: WsClient = context.switchToWs().getClient()
const message = this.reflector.get('message', context.getHandler())
if (client.disconnecting) {
// NOTE(@robot9706): When a client is disconnecting disallow any handlers
// except WsUnsubscribe for cleanup
const message = this.reflector.get('message', context.getHandler())
return message === WS_TYPE_UNSUBSCRIBE
}

Expand All @@ -124,9 +124,6 @@ export default class JwtAuthGuard extends AuthGuard('jwt') {
if (!sessionExpiresAt || sessionExpiresAt <= now) {
this.logger.debug('WebSocket session expired.')

await client.unsubscribeAll()

client.close()
throw new CruxUnauthorizedException()
}

Expand Down
4 changes: 1 addition & 3 deletions web/crux/src/app/version/version.ws.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,13 @@ export default class VersionWebSocketGateway {
@SocketClient() client: WsClient,
@VersionId() versionId: string,
@SocketSubscription() subscription: WsSubscription,
): Promise<boolean> {
) {
const data = await this.service.onEditorLeft(versionId, client.token)
const message: WsMessage<EditorLeftMessage> = {
type: WS_TYPE_EDITOR_LEFT,
data,
}
subscription.sendToAllExcept(client, message)

return true
}

@AuditLogLevel('disabled')
Expand Down
3 changes: 2 additions & 1 deletion web/crux/src/websockets/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ export type WsClient = WebSocket & {
connectionRequest: AuthorizedHttpRequest
sendWsMessage: WsSendClientMessage
subscriptions: Map<string, WsSubscription>
unsubscribeAll: () => Promise<void>
unsubscribeAll: VoidFunction
expireTimeout: NodeJS.Timeout

// NOTE(@robot9706): Used by jwt-auth.guard WS strategy to be able to call
// unsubscribe methods when the session is invalid (for cleanup)
Expand Down
37 changes: 29 additions & 8 deletions web/crux/src/websockets/dyo.ws.adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
catchError,
filter,
first,
forkJoin,
from,
fromEvent,
mergeAll,
Expand Down Expand Up @@ -48,6 +49,8 @@ export enum WebSocketReadyState {
CLOSED_STATE = 3,
}

const ERROR_SESSION_EXPIRED = 4000

export default class DyoWsAdapter extends AbstractWsAdapter {
private readonly logger = new Logger(DyoWsAdapter.name)

Expand Down Expand Up @@ -266,7 +269,7 @@ export default class DyoWsAdapter extends AbstractWsAdapter {
if (message.type === WS_TYPE_SUBSCRIBE) {
res = await route.onSubscribe(client, match, message)
} else if (message.type === WS_TYPE_UNSUBSCRIBE) {
res = await route.onUnsubscribe(client, match, message)
res = route.onUnsubscribe(client, match, message)
} else {
const err = new Error(`Invalid subscription type ${message.type}`)
this.logger.verbose(err)
Expand All @@ -293,30 +296,48 @@ export default class DyoWsAdapter extends AbstractWsAdapter {
client.on(CLOSE_EVENT, () => this.onClientDisconnect(client))
client.unsubscribeAll = () => this.onClientDisconnect(client)

this.startClientExpiryTimer(client)

client.setup = new WsClientSetup(client, client.token, () => this.bindClientMessageHandlers(client))
client.setup.start()

WsMetrics.connections().inc()
this.logger.log(`Connected ${client.token} clients: ${this.server?.clients?.size}`)
}

private async onClientDisconnect(client: WsClient): Promise<void> {
private startClientExpiryTimer(client: WsClient) {
const { sessionExpiresAt } = client.connectionRequest

const now = new Date().getTime()
const expireTime = sessionExpiresAt - now

client.expireTimeout = setTimeout(() => {
this.logger.warn(`Session expired for ${client.token}`)
client.unsubscribeAll()
client.close(ERROR_SESSION_EXPIRED, 'Expired')
}, expireTime)
}

private onClientDisconnect(client: WsClient) {
if (client.disconnecting) {
return
}

clearTimeout(client.expireTimeout)

client.disconnecting = true

this.logger.log(`Disconnected ${client.token} clients: ${this.server?.clients?.size}`)
WsMetrics.connections().dec()

await Promise.all(this.routes.map(it => it.onClientDisconnect(client)))

if (client?.subscriptions?.size > 0) {
this.logger.warn(`Client ${client.token} failed to cleanup all subscriptions!`)
}
const routeDisconnects = this.routes.map(it => it.onClientDisconnect(client))
forkJoin(routeDisconnects).subscribe(() => {
if (client?.subscriptions?.size > 0) {
this.logger.warn(`Client ${client.token} failed to cleanup all subscriptions!`)
}

client?.setup?.onClientDisconnect()
client?.setup?.onClientDisconnect()
})
}

private findRouteByPath(path: string): [WsRoute, WsRouteMatch] {
Expand Down
12 changes: 6 additions & 6 deletions web/crux/src/websockets/namespace.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,13 @@ describe('WsNamespace', () => {
})

it('should return UnsubscribeResult with a null when the client is not subscribed', async () => {
const result = await namespace.onUnsubscribe(client, unsubscribeMessage)
const result = await firstValueFrom(namespace.onUnsubscribe(client, unsubscribeMessage))

expect(result.res).toBe(null)
})

it('should return UnsubscribeResult with shouldRemove true when the client is not subscribed and there is no other client subscribed', async () => {
const result = await namespace.onUnsubscribe(client, unsubscribeMessage)
const result = await firstValueFrom(namespace.onUnsubscribe(client, unsubscribeMessage))

expect(result.shouldRemove).toBe(true)
})
Expand Down Expand Up @@ -335,27 +335,27 @@ describe('WsNamespace', () => {
})

it('should return UnsubscribeResult with shouldRemove false when the client is not subscribed', async () => {
const result = await namespace.onUnsubscribe(client, unsubscribeMessage)
const result = await firstValueFrom(namespace.onUnsubscribe(client, unsubscribeMessage))

expect(result.shouldRemove).toBe(false)
})

it('should return UnsubscribeResult with shouldRemove false', async () => {
namespace.onSubscribe(client, callbacks, subscribeMessage)

const result = await namespace.onUnsubscribe(client, unsubscribeMessage)
const result = await firstValueFrom(namespace.onUnsubscribe(client, unsubscribeMessage))

expect(result.shouldRemove).toBe(false)
})

it('should return UnsubscribeResult with an observable containing the correct unsubbed message', async () => {
const result = await namespace.onUnsubscribe(subscribedClient, unsubscribeMessage)
const result = await firstValueFrom(namespace.onUnsubscribe(subscribedClient, unsubscribeMessage))

expect(result.res).toEqual(successfulUnsubscribeMessage)
})

it('should return UnsubscribeResult with shouldRemove true, when this was the last client', async () => {
const result = await namespace.onUnsubscribe(subscribedClient, unsubscribeMessage)
const result = await firstValueFrom(namespace.onUnsubscribe(subscribedClient, unsubscribeMessage))

expect(result.shouldRemove).toBe(true)
})
Expand Down
41 changes: 27 additions & 14 deletions web/crux/src/websockets/namespace.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from '@nestjs/common'
import { EMPTY, Observable, Subject, filter, first, firstValueFrom, map, mergeWith, of, takeUntil } from 'rxjs'
import { EMPTY, Observable, Subject, filter, first, map, mergeWith, of, takeUntil } from 'rxjs'
import {
SubscriptionMessage,
WS_TYPE_SUBBED,
Expand Down Expand Up @@ -87,16 +87,16 @@ export default class WsNamespace implements WsSubscription {
return of(res).pipe(first())
}

async onUnsubscribe(client: WsClient, message: WsMessage<SubscriptionMessage> | null): Promise<UnsubcribeResult> {
onUnsubscribe(client: WsClient, message: WsMessage<SubscriptionMessage> | null): Observable<UnsubcribeResult> {
const { token } = client

const resources = this.clients.get(token)
if (!resources) {
this.logger.warn(`undefined resource for '${token}'`)
return {
return of({
res: null,
shouldRemove: this.clients.size < 1,
}
})
}

// When the connection is killed, we get an empty message,
Expand All @@ -113,14 +113,27 @@ export default class WsNamespace implements WsSubscription {
const { unsubscribe, transform, completer } = resources

if (unsubscribe) {
const unsubscribeResult = await firstValueFrom(transform(unsubscribe(message)))
if (!unsubscribeResult) {
this.logger.warn(`${this.path} @WsUnsubscribe returned undefined`)
return {
res: null,
shouldRemove: this.clients.size < 1,
}
}
return transform(unsubscribe(message)).pipe(
first(),
map(() => {
client.subscriptions.delete(this.path)

completer.next(undefined)
this.clients.delete(token)

this.logger.verbose(`${token} unsubscribed`)

return {
res: {
type: WS_TYPE_UNSUBBED,
data: {
path: this.path,
},
},
shouldRemove: this.clients.size < 1,
}
}),
)
}

client.subscriptions.delete(this.path)
Expand All @@ -130,15 +143,15 @@ export default class WsNamespace implements WsSubscription {

this.logger.verbose(`${token} unsubscribed`)

return {
return of({
res: {
type: WS_TYPE_UNSUBBED,
data: {
path: this.path,
},
},
shouldRemove: this.clients.size < 1,
}
})
}

onMessage(client: WsClient, message: WsMessage): Observable<WsMessage> {
Expand Down
Loading

0 comments on commit e9873e5

Please sign in to comment.