Skip to content

Commit

Permalink
UBERF-8532: Rework how ping work (#7522)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <[email protected]>
  • Loading branch information
haiodo authored Dec 23, 2024
1 parent 4159c3f commit bb6ee39
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 36 deletions.
70 changes: 53 additions & 17 deletions plugins/client-resources/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
//

import { Analytics } from '@hcengineering/analytics'
import client, { ClientSocket, ClientSocketReadyState, type ClientFactoryOptions } from '@hcengineering/client'
import client, {
ClientSocket,
ClientSocketReadyState,
pingConst,
pongConst,
type ClientFactoryOptions
} from '@hcengineering/client'
import core, {
Account,
Class,
Expand Down Expand Up @@ -160,7 +166,7 @@ class Connection implements ClientConnection {
if (!this.closed) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
void this.sendRequest({
method: 'ping',
method: pingConst,
params: [],
once: true,
handleResult: async (result) => {
Expand Down Expand Up @@ -317,8 +323,8 @@ class Connection implements ClientConnection {
}
return
}
if (resp.result === 'ping') {
void this.sendRequest({ method: 'ping', params: [] })
if (resp.result === pingConst) {
void this.sendRequest({ method: pingConst, params: [] })
return
}
if (resp.id !== undefined) {
Expand Down Expand Up @@ -461,6 +467,27 @@ class Connection implements ClientConnection {
if (this.websocket !== wsocket) {
return
}
if (event.data === pongConst) {
this.pingResponse = Date.now()
return
}
if (event.data === pingConst) {
void this.sendRequest({ method: pingConst, params: [] })
return
}
if (
event.data instanceof ArrayBuffer &&
(event.data.byteLength === pingConst.length || event.data.byteLength === pongConst.length)
) {
const text = new TextDecoder().decode(event.data)
if (text === pingConst) {
void this.sendRequest({ method: pingConst, params: [] })
}
if (text === pongConst) {
this.pingResponse = Date.now()
}
return
}
if (event.data instanceof Blob) {
void event.data.arrayBuffer().then((data) => {
const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
Expand Down Expand Up @@ -546,23 +573,30 @@ class Connection implements ClientConnection {
if (w instanceof Promise) {
await w
}
this.requests.set(id, promise)
if (data.method !== pingConst) {
this.requests.set(id, promise)
}
const sendData = (): void => {
if (this.websocket?.readyState === ClientSocketReadyState.OPEN) {
promise.startTime = Date.now()

const dta = ctx.withSync('serialize', {}, () =>
this.rpcHandler.serialize(
{
method: data.method,
params: data.params,
id,
time: Date.now()
},
this.binaryMode
if (data.method !== pingConst) {
const dta = ctx.withSync('serialize', {}, () =>
this.rpcHandler.serialize(
{
method: data.method,
params: data.params,
id,
time: Date.now()
},
this.binaryMode
)
)
)
ctx.withSync('send-data', {}, () => this.websocket?.send(dta))

ctx.withSync('send-data', {}, () => this.websocket?.send(dta))
} else {
this.websocket?.send(pingConst)
}
}
}
if (data.allowReconnect ?? true) {
Expand All @@ -579,7 +613,9 @@ class Connection implements ClientConnection {
sendData()
})
void ctx.with('broadcast-event', {}, () => broadcastEvent(client.event.NetworkRequests, this.requests.size))
return await promise.promise
if (data.method !== pingConst) {
return await promise.promise
}
})
}

Expand Down
3 changes: 3 additions & 0 deletions plugins/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ export type ClientFactory = (token: string, endpoint: string, opt?: ClientFactor
// ui - will filter out all server element's and all UI disabled elements.
export type FilterMode = 'none' | 'client' | 'ui'

export const pingConst = 'ping'
export const pongConst = 'pong!'

export default plugin(clientId, {
metadata: {
ClientSocketFactory: '' as Metadata<ClientSocketFactory>,
Expand Down
8 changes: 7 additions & 1 deletion server/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ export interface SessionRequest {
export interface ClientSessionCtx {
ctx: MeasureContext
sendResponse: (msg: any) => Promise<void>
sendPong: () => void
sendError: (msg: any, error: any) => Promise<void>
}

Expand Down Expand Up @@ -553,6 +554,8 @@ export interface ConnectionSocket {
isClosed: boolean
close: () => void
send: (ctx: MeasureContext, msg: Response<any>, binary: boolean, compression: boolean) => void

sendPong: () => void
data: () => Record<string, any>

readRequest: (buffer: Buffer, binary: boolean) => Request<any>
Expand Down Expand Up @@ -664,7 +667,7 @@ export interface SessionManager {
ws: ConnectionSocket,
request: Request<any>,
workspace: string // wsId, toWorkspaceString()
) => void
) => Promise<void>
}

/**
Expand Down Expand Up @@ -692,3 +695,6 @@ export type ServerFactory = (
accountsUrl: string,
externalStorage: StorageAdapter
) => () => Promise<void>

export const pingConst = 'ping'
export const pongConst = 'pong!'
5 changes: 2 additions & 3 deletions server/server/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import core, {
import { PlatformError, unknownError } from '@hcengineering/platform'
import {
BackupClientOps,
SessionDataImpl,
createBroadcastEvent,
SessionDataImpl,
type ClientSessionCtx,
type ConnectionSocket,
type Pipeline,
Expand Down Expand Up @@ -98,9 +98,8 @@ export class ClientSession implements Session {
}

async ping (ctx: ClientSessionCtx): Promise<void> {
// console.log('ping')
this.lastRequest = Date.now()
await ctx.sendResponse('pong!')
ctx.sendPong()
}

async loadModel (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string): Promise<void> {
Expand Down
19 changes: 12 additions & 7 deletions server/server/src/sessionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import { Analytics } from '@hcengineering/analytics'
import core, {
TxFactory,
WorkspaceEvent,
cutObjectArray,
generateId,
isArchivingMode,
Expand All @@ -25,8 +23,10 @@ import core, {
isWorkspaceCreating,
systemAccountEmail,
toWorkspaceString,
TxFactory,
versionToString,
withContext,
WorkspaceEvent,
type BaseWorkspaceInfo,
type Branding,
type BrandingMap,
Expand All @@ -40,6 +40,7 @@ import { unknownError, type Status } from '@hcengineering/platform'
import { type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc'
import {
LOGGING_ENABLED,
pingConst,
Pipeline,
PipelineFactory,
ServerFactory,
Expand Down Expand Up @@ -240,7 +241,7 @@ class TSessionManager implements SessionManager {
if (s[1].socket.checkState()) {
s[1].socket.send(
workspace.context,
{ result: 'ping' },
{ result: pingConst },
s[1].session.binaryMode,
s[1].session.useCompression
)
Expand Down Expand Up @@ -724,7 +725,8 @@ class TSessionManager implements SessionManager {
ctx,
sendError: async (msg, error: Status) => {
// Assume no error send
}
},
sendPong: () => {}
}

const status = (await session.findAllRaw(ctx, core.class.UserStatus, { user: user._id }, { limit: 1 }))[0]
Expand Down Expand Up @@ -933,7 +935,7 @@ class TSessionManager implements SessionManager {
ws: ConnectionSocket,
request: Request<any>,
workspace: string // wsId, toWorkspaceString()
): void {
): Promise<void> {
const backupMode = service.getMode() === 'backup'

const userCtx = requestCtx.newChild(
Expand All @@ -949,7 +951,7 @@ class TSessionManager implements SessionManager {
const reqId = generateId()

const st = Date.now()
void userCtx
return userCtx
.with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => {
if (request.time != null) {
const delta = Date.now() - request.time
Expand Down Expand Up @@ -1024,6 +1026,9 @@ class TSessionManager implements SessionManager {
})
userCtx.end()
},
sendPong: () => {
ws.sendPong()
},
ctx,
sendError: async (msg, error: Status) => {
await sendResponse(ctx, service, ws, {
Expand Down Expand Up @@ -1137,7 +1142,7 @@ export function startSessionManager (
shutdown: opt.serverFactory(
sessions,
(rctx, service, ws, msg, workspace) => {
sessions.handleRequest(rctx, service, ws, msg, workspace)
void sessions.handleRequest(rctx, service, ws, msg, workspace)
},
ctx,
opt.pipelineFactory,
Expand Down
11 changes: 11 additions & 0 deletions server/ws/src/server_http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import {
} from '@hcengineering/server'
import {
LOGGING_ENABLED,
pingConst,
pongConst,
type ConnectionSocket,
type HandleRequestFunction,
type PipelineFactory,
Expand Down Expand Up @@ -552,9 +554,18 @@ function createWebsocketClientSocket (
return true
},
readRequest: (buffer: Buffer, binary: boolean) => {
if (buffer.length === pingConst.length && buffer.toString() === pingConst) {
return { method: pingConst, params: [], id: -1, time: Date.now() }
}
return rpcHandler.readRequest(buffer, binary)
},
data: () => data,
sendPong: () => {
if (ws.readyState !== ws.OPEN || cs.isClosed) {
return
}
ws.send(pongConst)
},
send: (ctx: MeasureContext, msg, binary, compression) => {
const smsg = rpcHandler.serialize(msg, binary)

Expand Down
Loading

0 comments on commit bb6ee39

Please sign in to comment.