From 25e97215f7f36e12c88696b3318a0228ea8d1bf7 Mon Sep 17 00:00:00 2001 From: "M. E. Abdelsalam" Date: Sun, 15 Dec 2024 16:16:14 +0200 Subject: [PATCH 1/3] init: classes defined for managing wss connections. --- services/server/src/index.ts | 4 +- services/server/src/wss/handler.ts | 479 +-------------------- services/server/src/wss/handlers/call.ts | 97 +++++ services/server/src/wss/handlers/index.ts | 11 + services/server/src/wss/index.ts | 19 +- services/server/src/wss/oldhandler.ts | 485 ++++++++++++++++++++++ 6 files changed, 618 insertions(+), 477 deletions(-) create mode 100644 services/server/src/wss/handlers/call.ts create mode 100644 services/server/src/wss/handlers/index.ts create mode 100644 services/server/src/wss/oldhandler.ts diff --git a/services/server/src/index.ts b/services/server/src/index.ts index 42e75e93..2f20fc94 100644 --- a/services/server/src/index.ts +++ b/services/server/src/index.ts @@ -4,7 +4,6 @@ import express, { json } from "express"; import routes from "@/routes"; import { ghostConfig, jwtSecret, serverConfig } from "@/constants"; import { errorHandler } from "@/middleware/error"; -import { wssHandler } from "@/wss"; import bodyParser from "body-parser"; import cors from "cors"; import logger from "morgan"; @@ -16,6 +15,7 @@ import { authMiddleware, adminOnly } from "@litespace/auth"; import { isAllowedOrigin } from "@/lib/cors"; import { cache } from "@/lib/cache"; import "colors"; +import { wssConnectionHandler } from "./wss"; // Stablish connection with the redis cache. cache.connect(); @@ -33,7 +33,7 @@ io.engine.use( ) ); io.engine.use(onlyForHandshake(authorizeSocket)); -io.on("connection", wssHandler); +io.on("connection", wssConnectionHandler); app.use( logger(function (tokens, req, res) { diff --git a/services/server/src/wss/handler.ts b/services/server/src/wss/handler.ts index 8c178e42..9b786a44 100644 --- a/services/server/src/wss/handler.ts +++ b/services/server/src/wss/handler.ts @@ -1,362 +1,16 @@ -import { calls, messages, rooms, users } from "@litespace/models"; -import { ICall, IUser, Wss } from "@litespace/types"; +import { IUser, Wss } from "@litespace/types"; import { Socket } from "socket.io"; -import wss from "@/validation/wss"; -import zod from "zod"; -import { boolean, id, string, withNamedId } from "@/validation/utils"; -import { isEmpty } from "lodash"; -import { logger } from "@litespace/sol/log"; -import { safe } from "@litespace/sol/error"; -import { sanitizeMessage } from "@litespace/sol/chat"; -import "colors"; -import { isAdmin, isGhost, isStudent, isTutor, isUser } from "@litespace/auth"; -import { background } from "@/workers"; -import { PartentPortMessage, PartentPortMessageType } from "@/workers/messages"; -import { cache } from "@/lib/cache"; -import { getGhostCall } from "@litespace/sol/ghost"; -import { canJoinCall } from "@/lib/call"; -import dayjs from "@/lib/dayjs"; -const peerPayload = zod.object({ callId: id, peerId: string }); -const updateMessagePayload = zod.object({ text: string, id }); -const toggleCameraPayload = zod.object({ call: id, camera: boolean }); -const toggleMicPayload = zod.object({ call: id, mic: boolean }); -const registerPeerPayload = zod.object({ peer: zod.string() }); -const userTypingPayload = zod.object({ - roomId: zod.number(), -}); - -const callTypes = ["lesson", "interview"] as const satisfies ICall.Type[]; -const onJoinCallPayload = zod.object({ - callId: zod.number(), - type: zod.enum(callTypes), -}); -const onLeaveCallPayload = zod.object({ callId: zod.number() }); - -const stdout = logger("wss"); - -export class WssHandler { - socket: Socket; - user: IUser.Self | IUser.Ghost; +export abstract class WSSHandler { + protected readonly socket: Socket; + protected readonly user: IUser.Self | IUser.Ghost; constructor(socket: Socket, user: IUser.Self | IUser.Ghost) { this.socket = socket; this.user = user; - this.initialize(); - } - - async initialize() { - this.connect(); - this.socket.on(Wss.ClientEvent.SendMessage, this.sendMessage.bind(this)); - this.socket.on( - Wss.ClientEvent.UpdateMessage, - this.updateMessage.bind(this) - ); - this.socket.on( - Wss.ClientEvent.DeleteMessage, - this.deleteMessage.bind(this) - ); - this.socket.on(Wss.ClientEvent.PeerOpened, this.peerOpened.bind(this)); - this.socket.on(Wss.ClientEvent.RegisterPeer, this.registerPeer.bind(this)); - this.socket.on(Wss.ClientEvent.Disconnect, this.disconnect.bind(this)); - this.socket.on(Wss.ClientEvent.ToggleCamera, this.toggleCamera.bind(this)); - this.socket.on(Wss.ClientEvent.ToggleMic, this.toggleMic.bind(this)); - this.socket.on(Wss.ClientEvent.UserTyping, this.userTyping.bind(this)); - this.socket.on(Wss.ClientEvent.JoinCall, this.onJoinCall.bind(this)); - this.socket.on(Wss.ClientEvent.LeaveCall, this.onLeaveCall.bind(this)); - } - - /* - * This event listener will be called whenever a user - * joins a call. For instance, when - * users are joining a lesson, or a tutor is joining an - * interview. - */ - async onJoinCall(data: unknown) { - const result = await safe(async () => { - const { callId, type } = onJoinCallPayload.parse(data); - - const user = this.user; - // todo: add ghost as a member of the call - if (isGhost(user)) return stdout.warning("Unsupported"); - - stdout.info(`User ${user.id} is joining call ${callId}.`); - - const canJoin = await canJoinCall({ - userId: user.id, - callType: type, - callId, - }); - - if (!canJoin) throw Error("Forbidden"); - - // add user to the call by inserting row to call_members relation - await cache.call.addMember({ userId: user.id, callId: callId }); - this.socket.join(this.asCallRoomId(callId)); - - stdout.info(`User ${user.id} has joined call ${callId}.`); - - // TODO: retrieve user data (ICall.PopulatedMember) from the database - // discuss with the team if shall we retrieve it from postgres, - // or store it in redis in the first place. - - // notify members that a new member has joined the call - // NOTE: the user notifies himself as well that he successfully joined the call. - this.broadcast( - Wss.ServerEvent.MemberJoinedCall, - this.asCallRoomId(callId), - { userId: user.id } // TODO: define the payload struct type in the types package - ); - }); - if (result instanceof Error) stdout.error(result.message); - } - - /* - * This event listener will be called whenever a user - * leaves a call. For instance, when users are leaving - * a lesson, or a tutor is leaving an interview. - */ - async onLeaveCall(data: unknown) { - const result = await safe(async () => { - const { callId } = onLeaveCallPayload.parse(data); - - const user = this.user; - if (isGhost(user)) return; - - stdout.info(`User ${user.id} is leaving call ${callId}.`); - - // remove user from the call by deleting the corresponding row from call_members relation - await cache.call.removeMember({ - userId: user.id, - callId: callId, - }); - - stdout.info(`User ${user.id} has left call ${callId}.`); - - // notify members that a member has left the call - this.socket.broadcast - .to(this.asCallRoomId(callId)) - .emit(Wss.ServerEvent.MemberLeftCall, { - userId: user.id, - }); - }); - if (result instanceof Error) stdout.error(result.message); - } - - async joinRooms() { - const error = await safe(async () => { - const user = this.user; - if (isGhost(user)) return; - - const { list } = await rooms.findMemberRooms({ userId: user.id }); - - this.socket.join(list.map((roomId) => this.asChatRoomId(roomId))); - // private channel - this.socket.join(user.id.toString()); - - if (isStudent(this.user)) this.socket.join(Wss.Room.TutorsCache); - if (isAdmin(this.user)) this.socket.join(Wss.Room.ServerStats); - - // todo: get user calls from cache - const callsList = await calls.find({ - users: [user.id], - full: true, - after: dayjs.utc().startOf("day").toISOString(), - before: dayjs.utc().add(1, "day").toISOString(), - }); - - this.socket.join( - callsList.list.map((call) => this.asCallRoomId(call.id)) - ); - }); - - if (error instanceof Error) stdout.error(error.message); - } - - /** - * @deprecated should be removed in favor of the new arch. - */ - async peerOpened(ids: unknown) { - const error = await safe(async () => { - const { callId, peerId } = peerPayload.parse(ids); - const user = this.user; - - const members = await calls.findCallMembers([callId]); - if (isEmpty(members)) return; - - const memberIds = members.map((member) => member.userId); - const isMember = isUser(user) && memberIds.includes(user.id); - const allowed = isMember || isGhost(this.user); - if (!allowed) return; - - this.socket.join(callId.toString()); - this.socket - .to(callId.toString()) - .emit(Wss.ServerEvent.UserJoinedCall, { peerId }); - }); - if (error instanceof Error) stdout.error(error.message); - } - - async registerPeer(data: unknown) { - const result = await safe(async () => { - const { peer } = registerPeerPayload.parse(data); - const user = this.user; - const id = isGhost(user) ? user : user.email; - stdout.info(`Register peer: ${peer} for ${id}`); - - if (isGhost(user)) - await cache.peer.setGhostPeerId(getGhostCall(user), peer); - if (isTutor(user)) await cache.peer.setUserPeerId(user.id, peer); - - // notify peers to refetch the peer id if needed - }); - if (result instanceof Error) stdout.error(result.message); - } - - async userTyping(data: unknown) { - const error = safe(async () => { - const { roomId } = userTypingPayload.parse(data); - - const user = this.user; - if (isGhost(user)) return; - - const members = await rooms.findRoomMembers({ roomIds: [roomId] }); - if (isEmpty(members)) return; - - const isMember = members.find((member) => member.id === user.id); - if (!isMember) - throw new Error(`User(${user.id}) isn't member of room Id: ${roomId}`); - - this.socket.to(roomId.toString()).emit(Wss.ServerEvent.UserTyping, { - roomId, - userId: user.id, - }); - }); - - if (error instanceof Error) stdout.error(error.message); - } - - async sendMessage(data: unknown) { - const error = safe(async () => { - const user = this.user; - if (isGhost(user)) return; - - const { roomId, text } = wss.message.send.parse(data); - const userId = user.id; - - stdout.log(`u:${userId} is send a message to r:${roomId}`); - - const members = await rooms.findRoomMembers({ roomIds: [roomId] }); - if (!members) throw Error("Room not found"); - - const member = members.map((member) => member.id).includes(userId); - if (!member) throw new Error("Unauthorized"); - - const sanitized = sanitizeMessage(text); - if (!sanitized) return; // empty message - const message = await messages.create({ - text: sanitized, - userId, - roomId, - }); - - this.broadcast( - Wss.ServerEvent.RoomMessage, - this.asChatRoomId(roomId), - message - ); - }); - - if (error instanceof Error) stdout.error(error); } - async updateMessage(data: unknown) { - const error = await safe(async () => { - const user = this.user; - if (isGhost(user)) return; - - const { id, text } = updateMessagePayload.parse(data); - const message = await messages.findById(id); - if (!message || message.deleted) throw new Error("Message not found"); - - const owner = message.userId === user.id; - if (!owner) throw new Error("Forbidden"); - - const sanitized = sanitizeMessage(text); - if (!sanitized) throw new Error("Invalid message"); - - const updated = await messages.update(id, { text: sanitized }); - if (!updated) throw new Error("Mesasge not update; should never happen."); - - this.broadcast( - Wss.ServerEvent.RoomMessageUpdated, - this.asChatRoomId(message.roomId), - updated - ); - }); - - if (error instanceof Error) stdout.error(error.message); - } - - async deleteMessage(data: unknown) { - const error = safe(async () => { - const user = this.user; - if (isGhost(user)) return; - - const { id }: { id: number } = withNamedId("id").parse(data); - - const message = await messages.findById(id); - if (!message || message.deleted) throw new Error("Message not found"); - - const owner = message.userId === user.id; - if (!owner) throw new Error("Forbidden"); - - await messages.markAsDeleted(id); - - this.broadcast( - Wss.ServerEvent.RoomMessageDeleted, - this.asChatRoomId(message.roomId), - { - roomId: message.roomId, - messageId: message.id, - } - ); - }); - - if (error instanceof Error) stdout.error(error.message); - } - - async toggleCamera(data: unknown) { - const error = safe(async () => { - const user = this.user; - if (isGhost(user)) return; - const { call, camera } = toggleCameraPayload.parse(data); - // todo: add validation - this.broadcast(Wss.ServerEvent.CameraToggled, call.toString(), { - user: user.id, - camera, - }); - }); - - if (error instanceof Error) stdout.error(error.message); - } - - async toggleMic(data: unknown) { - const error = safe(async () => { - const user = this.user; - if (isGhost(user)) return; - const { call, mic } = toggleMicPayload.parse(data); - // todo: add validation - this.broadcast(Wss.ServerEvent.MicToggled, call.toString(), { - user: user.id, - mic, - }); - }); - - if (error instanceof Error) stdout.error(error.message); - } - - async broadcast( + protected async broadcast( event: T, room: string, ...data: Parameters @@ -364,127 +18,4 @@ export class WssHandler { this.socket.emit(event, ...data); this.socket.broadcast.to(room).emit(event, ...data); } - - async markMessageAsRead(data: unknown) { - try { - const user = this.user; - if (isGhost(user)) return; - - const messageId = wss.message.markMessageAsRead.parse(data).id; - const message = await messages.findById(messageId); - if (!message) throw new Error("Message not found"); - - const userId = user.id; - if (userId !== message.userId) throw new Error("Unauthorized"); - if (message.read) - return console.log("Message is already marked as read".yellow); - - await messages.markAsRead(messageId); - - this.socket.broadcast - .to(message.roomId.toString()) - .emit(Wss.ServerEvent.MessageRead, { messageId }); - } catch (error) { - console.log(error); - } - } - - async connect() { - const error = safe(async () => { - await this.online(); - await this.joinRooms(); - if (isAdmin(this.user)) this.emitServerStats(); - }); - if (error instanceof Error) stdout.error(error.message); - } - - async disconnect() { - const error = safe(async () => { - await this.offline(); - await this.deregisterPeer(); - await this.removeUserFromCalls(); - }); - if (error instanceof Error) stdout.error(error.message); - } - - async online() { - const user = this.user; - if (isGhost(user)) return; - const info = await users.update(user.id, { online: true }); - this.announceStatus(info); - } - - async offline() { - const user = this.user; - if (isGhost(user)) return; - const info = await users.update(user.id, { online: false }); - this.announceStatus(info); - } - - async emitServerStats() { - background.on("message", (message: PartentPortMessage) => { - if (message.type === PartentPortMessageType.Stats) - return this.socket.emit(Wss.ServerEvent.ServerStats, message.stats); - }); - } - - /** - * Remove ghost and tutor peer id from the cache. - * - * @note should be called when the socket disconnects from the server. - */ - async deregisterPeer() { - // todo: notify peers that the current user got disconnected - const user = this.user; - const display = isGhost(user) ? user : user.email; - stdout.info(`Deregister peer for: ${display}`); - - if (isGhost(user)) - return await cache.peer.removeGhostPeerId(getGhostCall(user)); - if (isTutor(user)) await cache.peer.removeUserPeerId(user.id); - } - - async removeUserFromCalls() { - const user = this.user; - if (isGhost(user)) return; - - const callId = await cache.call.removeMemberByUserId(user.id); - if (!callId) return; - - // notify members that a member has left the call - this.socket.broadcast - .to(this.asCallRoomId(callId)) - .emit(Wss.ServerEvent.MemberLeftCall, { - userId: user.id, - }); - } - - async announceStatus(user: IUser.Self) { - const userRooms = await rooms.findMemberFullRoomIds(user.id); - - for (const room of userRooms) { - this.broadcast(Wss.ServerEvent.UserStatusChanged, room.toString(), { - online: user.online, - }); - } - } - - asCallRoomId(callId: number) { - return `call:${callId}`; - } - - asChatRoomId(roomId: number) { - return `room:${roomId}`; - } -} - -export function wssHandler(socket: Socket) { - const user = socket.request.user; - if (!user) { - stdout.warning( - "(function) wssHandler: No user has been found in the request obj!" - ); - return; - } - return new WssHandler(socket, user); } diff --git a/services/server/src/wss/handlers/call.ts b/services/server/src/wss/handlers/call.ts new file mode 100644 index 00000000..ddf1e2d0 --- /dev/null +++ b/services/server/src/wss/handlers/call.ts @@ -0,0 +1,97 @@ +import { cache } from "@/lib/cache"; +import { canJoinCall } from "@/lib/call"; +import { isGhost } from "@litespace/auth"; +import { logger, safe } from "@litespace/sol"; +import { ICall, Wss } from "@litespace/types"; +import zod from "zod"; +import { WSSHandler } from "../handler"; + +const callTypes = ["lesson", "interview"] as const satisfies ICall.Type[]; +const stdout = logger("wss"); + +const onJoinCallPayload = zod.object({ callId: zod.number(), type: zod.enum(callTypes) }); +const onLeaveCallPayload = zod.object({ callId: zod.number() }); + +export class CallHandler extends WSSHandler { + /* + * This event listener will be called whenever a user + * joins a call. For instance, when + * users are joining a lesson, or a tutor is joining an + * interview. + */ + async onJoinCall(data: unknown) { + const result = await safe(async () => { + const { callId, type } = onJoinCallPayload.parse(data); + + const user = this.user; + // todo: add ghost as a member of the call + if (isGhost(user)) return stdout.warning("Unsupported"); + + stdout.info(`User ${user.id} is joining call ${callId}.`); + + const canJoin = await canJoinCall({ + userId: user.id, + callType: type, + callId, + }); + + if (!canJoin) throw Error("Forbidden"); + + // add user to the call by inserting row to call_members relation + await cache.call.addMember({ userId: user.id, callId: callId }); + this.socket.join(this.asCallRoomId(callId)); + + stdout.info(`User ${user.id} has joined call ${callId}.`); + + // TODO: retrieve user data (ICall.PopulatedMember) from the database + // discuss with the team if shall we retrieve it from postgres, + // or store it in redis in the first place. + + // notify members that a new member has joined the call + // NOTE: the user notifies himself as well that he successfully joined the call. + this.broadcast( + Wss.ServerEvent.MemberJoinedCall, + this.asCallRoomId(callId), + { userId: user.id } // TODO: define the payload struct type in the types package + ); + }); + if (result instanceof Error) stdout.error(result.message); + } + + /* + * This event listener will be called whenever a user + * leaves a call. For instance, when users are leaving + * a lesson, or a tutor is leaving an interview. + */ + async onLeaveCall(data: unknown) { + const result = await safe(async () => { + const { callId } = onLeaveCallPayload.parse(data); + + const user = this.user; + if (isGhost(user)) return; + + stdout.info(`User ${user.id} is leaving call ${callId}.`); + + // remove user from the call by deleting the corresponding row from call_members relation + await cache.call.removeMember({ + userId: user.id, + callId: callId, + }); + + stdout.info(`User ${user.id} has left call ${callId}.`); + + // notify members that a member has left the call + this.socket.broadcast + .to(this.asCallRoomId(callId)) + .emit(Wss.ServerEvent.MemberLeftCall, { + userId: user.id, + }); + }); + if (result instanceof Error) stdout.error(result.message); + } + + private asCallRoomId(callId: number) { + return `call:${callId}`; + } +} + diff --git a/services/server/src/wss/handlers/index.ts b/services/server/src/wss/handlers/index.ts new file mode 100644 index 00000000..fb04c4ec --- /dev/null +++ b/services/server/src/wss/handlers/index.ts @@ -0,0 +1,11 @@ +import { Socket } from "socket.io"; +import { CallHandler } from "./call"; +import { IUser } from "@litespace/types"; + +export class WSSHandlers { + public readonly call: CallHandler; + + constructor(socket: Socket, user: IUser.Self | IUser.Ghost) { + this.call = new CallHandler(socket, user); + } +} diff --git a/services/server/src/wss/index.ts b/services/server/src/wss/index.ts index c9b8acfb..189ba082 100644 --- a/services/server/src/wss/index.ts +++ b/services/server/src/wss/index.ts @@ -1 +1,18 @@ -export { wssHandler } from "@/wss/handler"; +import { Socket } from "socket.io"; +import { WSSHandlers } from "./handlers"; +import { logger } from "@litespace/sol"; +import { Wss } from "@litespace/types"; + + +const stdout = logger("wss"); + +export function wssConnectionHandler(socket: Socket) { + const user = socket.request.user; + if (!user) { + stdout.warning("(function) wssHandler: No user has been found in the request obj!"); + return; + } + const handlers = new WSSHandlers(socket, user); + socket.on(Wss.ClientEvent.JoinCall, handlers.call.onJoinCall); + socket.on(Wss.ClientEvent.LeaveCall, handlers.call.onLeaveCall); +} diff --git a/services/server/src/wss/oldhandler.ts b/services/server/src/wss/oldhandler.ts new file mode 100644 index 00000000..0330a930 --- /dev/null +++ b/services/server/src/wss/oldhandler.ts @@ -0,0 +1,485 @@ +import { calls, messages, rooms, users } from "@litespace/models"; +import { ICall, IUser, Wss } from "@litespace/types"; +import { Socket } from "socket.io"; +import wss from "@/validation/wss"; +import zod from "zod"; +import { boolean, id, string, withNamedId } from "@/validation/utils"; +import { isEmpty } from "lodash"; +import { logger } from "@litespace/sol/log"; +import { safe } from "@litespace/sol/error"; +import { sanitizeMessage } from "@litespace/sol/chat"; +import "colors"; +import { isAdmin, isGhost, isStudent, isTutor, isUser } from "@litespace/auth"; +import { background } from "@/workers"; +import { PartentPortMessage, PartentPortMessageType } from "@/workers/messages"; +import { cache } from "@/lib/cache"; +import { getGhostCall } from "@litespace/sol/ghost"; +import { canJoinCall } from "@/lib/call"; +import dayjs from "@/lib/dayjs"; + +const peerPayload = zod.object({ callId: id, peerId: string }); +const updateMessagePayload = zod.object({ text: string, id }); +const toggleCameraPayload = zod.object({ call: id, camera: boolean }); +const toggleMicPayload = zod.object({ call: id, mic: boolean }); +const registerPeerPayload = zod.object({ peer: zod.string() }); +const userTypingPayload = zod.object({ + roomId: zod.number(), +}); + +const callTypes = ["lesson", "interview"] as const satisfies ICall.Type[]; +const onJoinCallPayload = zod.object({ + callId: zod.number(), + type: zod.enum(callTypes), +}); +const onLeaveCallPayload = zod.object({ callId: zod.number() }); + +const stdout = logger("wss"); + +export class WssHandler { + socket: Socket; + user: IUser.Self | IUser.Ghost; + + constructor(socket: Socket, user: IUser.Self | IUser.Ghost) { + this.socket = socket; + this.user = user; + this.initialize(); + } + + async initialize() { + this.connect(); + this.socket.on(Wss.ClientEvent.SendMessage, this.sendMessage.bind(this)); + this.socket.on(Wss.ClientEvent.UpdateMessage, this.updateMessage.bind(this)); + this.socket.on(Wss.ClientEvent.DeleteMessage, this.deleteMessage.bind(this)); + this.socket.on(Wss.ClientEvent.PeerOpened, this.peerOpened.bind(this)); + this.socket.on(Wss.ClientEvent.RegisterPeer, this.registerPeer.bind(this)); + this.socket.on(Wss.ClientEvent.Disconnect, this.disconnect.bind(this)); + this.socket.on(Wss.ClientEvent.ToggleCamera, this.toggleCamera.bind(this)); + this.socket.on(Wss.ClientEvent.ToggleMic, this.toggleMic.bind(this)); + this.socket.on(Wss.ClientEvent.UserTyping, this.userTyping.bind(this)); + this.socket.on(Wss.ClientEvent.JoinCall, this.onJoinCall.bind(this)); + this.socket.on(Wss.ClientEvent.LeaveCall, this.onLeaveCall.bind(this)); + } + + /* + * This event listener will be called whenever a user + * joins a call. For instance, when + * users are joining a lesson, or a tutor is joining an + * interview. + */ + async onJoinCall(data: unknown) { + const result = await safe(async () => { + const { callId, type } = onJoinCallPayload.parse(data); + + const user = this.user; + // todo: add ghost as a member of the call + if (isGhost(user)) return stdout.warning("Unsupported"); + + stdout.info(`User ${user.id} is joining call ${callId}.`); + + const canJoin = await canJoinCall({ + userId: user.id, + callType: type, + callId, + }); + + if (!canJoin) throw Error("Forbidden"); + + // add user to the call by inserting row to call_members relation + await cache.call.addMember({ userId: user.id, callId: callId }); + this.socket.join(this.asCallRoomId(callId)); + + stdout.info(`User ${user.id} has joined call ${callId}.`); + + // TODO: retrieve user data (ICall.PopulatedMember) from the database + // discuss with the team if shall we retrieve it from postgres, + // or store it in redis in the first place. + + // notify members that a new member has joined the call + // NOTE: the user notifies himself as well that he successfully joined the call. + this.broadcast( + Wss.ServerEvent.MemberJoinedCall, + this.asCallRoomId(callId), + { userId: user.id } // TODO: define the payload struct type in the types package + ); + }); + if (result instanceof Error) stdout.error(result.message); + } + + /* + * This event listener will be called whenever a user + * leaves a call. For instance, when users are leaving + * a lesson, or a tutor is leaving an interview. + */ + async onLeaveCall(data: unknown) { + const result = await safe(async () => { + const { callId } = onLeaveCallPayload.parse(data); + + const user = this.user; + if (isGhost(user)) return; + + stdout.info(`User ${user.id} is leaving call ${callId}.`); + + // remove user from the call by deleting the corresponding row from call_members relation + await cache.call.removeMember({ + userId: user.id, + callId: callId, + }); + + stdout.info(`User ${user.id} has left call ${callId}.`); + + // notify members that a member has left the call + this.socket.broadcast + .to(this.asCallRoomId(callId)) + .emit(Wss.ServerEvent.MemberLeftCall, { + userId: user.id, + }); + }); + if (result instanceof Error) stdout.error(result.message); + } + + async joinRooms() { + const error = await safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { list } = await rooms.findMemberRooms({ userId: user.id }); + + this.socket.join(list.map((roomId) => this.asChatRoomId(roomId))); + // private channel + this.socket.join(user.id.toString()); + + if (isStudent(this.user)) this.socket.join(Wss.Room.TutorsCache); + if (isAdmin(this.user)) this.socket.join(Wss.Room.ServerStats); + + // todo: get user calls from cache + const callsList = await calls.find({ + users: [user.id], + full: true, + after: dayjs.utc().startOf("day").toISOString(), + before: dayjs.utc().add(1, "day").toISOString(), + }); + + this.socket.join( + callsList.list.map((call) => this.asCallRoomId(call.id)) + ); + }); + + if (error instanceof Error) stdout.error(error.message); + } + + /** + * @deprecated should be removed in favor of the new arch. + */ + async peerOpened(ids: unknown) { + const error = await safe(async () => { + const { callId, peerId } = peerPayload.parse(ids); + const user = this.user; + + const members = await calls.findCallMembers([callId]); + if (isEmpty(members)) return; + + const memberIds = members.map((member) => member.userId); + const isMember = isUser(user) && memberIds.includes(user.id); + const allowed = isMember || isGhost(this.user); + if (!allowed) return; + + this.socket.join(callId.toString()); + this.socket + .to(callId.toString()) + .emit(Wss.ServerEvent.UserJoinedCall, { peerId }); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async registerPeer(data: unknown) { + const result = await safe(async () => { + const { peer } = registerPeerPayload.parse(data); + const user = this.user; + const id = isGhost(user) ? user : user.email; + stdout.info(`Register peer: ${peer} for ${id}`); + + if (isGhost(user)) + await cache.peer.setGhostPeerId(getGhostCall(user), peer); + if (isTutor(user)) await cache.peer.setUserPeerId(user.id, peer); + + // notify peers to refetch the peer id if needed + }); + if (result instanceof Error) stdout.error(result.message); + } + + async userTyping(data: unknown) { + const error = safe(async () => { + const { roomId } = userTypingPayload.parse(data); + + const user = this.user; + if (isGhost(user)) return; + + const members = await rooms.findRoomMembers({ roomIds: [roomId] }); + if (isEmpty(members)) return; + + const isMember = members.find((member) => member.id === user.id); + if (!isMember) + throw new Error(`User(${user.id}) isn't member of room Id: ${roomId}`); + + this.socket.to(roomId.toString()).emit(Wss.ServerEvent.UserTyping, { + roomId, + userId: user.id, + }); + }); + + if (error instanceof Error) stdout.error(error.message); + } + + async sendMessage(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { roomId, text } = wss.message.send.parse(data); + const userId = user.id; + + stdout.log(`u:${userId} is send a message to r:${roomId}`); + + const members = await rooms.findRoomMembers({ roomIds: [roomId] }); + if (!members) throw Error("Room not found"); + + const member = members.map((member) => member.id).includes(userId); + if (!member) throw new Error("Unauthorized"); + + const sanitized = sanitizeMessage(text); + if (!sanitized) return; // empty message + const message = await messages.create({ + text: sanitized, + userId, + roomId, + }); + + this.broadcast( + Wss.ServerEvent.RoomMessage, + this.asChatRoomId(roomId), + message + ); + }); + + if (error instanceof Error) stdout.error(error); + } + + async updateMessage(data: unknown) { + const error = await safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { id, text } = updateMessagePayload.parse(data); + const message = await messages.findById(id); + if (!message || message.deleted) throw new Error("Message not found"); + + const owner = message.userId === user.id; + if (!owner) throw new Error("Forbidden"); + + const sanitized = sanitizeMessage(text); + if (!sanitized) throw new Error("Invalid message"); + + const updated = await messages.update(id, { text: sanitized }); + if (!updated) throw new Error("Mesasge not update; should never happen."); + + this.broadcast( + Wss.ServerEvent.RoomMessageUpdated, + this.asChatRoomId(message.roomId), + updated + ); + }); + + if (error instanceof Error) stdout.error(error.message); + } + + async deleteMessage(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { id }: { id: number } = withNamedId("id").parse(data); + + const message = await messages.findById(id); + if (!message || message.deleted) throw new Error("Message not found"); + + const owner = message.userId === user.id; + if (!owner) throw new Error("Forbidden"); + + await messages.markAsDeleted(id); + + this.broadcast( + Wss.ServerEvent.RoomMessageDeleted, + this.asChatRoomId(message.roomId), + { + roomId: message.roomId, + messageId: message.id, + } + ); + }); + + if (error instanceof Error) stdout.error(error.message); + } + + async toggleCamera(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + const { call, camera } = toggleCameraPayload.parse(data); + // todo: add validation + this.broadcast(Wss.ServerEvent.CameraToggled, call.toString(), { + user: user.id, + camera, + }); + }); + + if (error instanceof Error) stdout.error(error.message); + } + + async toggleMic(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + const { call, mic } = toggleMicPayload.parse(data); + // todo: add validation + this.broadcast(Wss.ServerEvent.MicToggled, call.toString(), { + user: user.id, + mic, + }); + }); + + if (error instanceof Error) stdout.error(error.message); + } + + async broadcast( + event: T, + room: string, + ...data: Parameters + ) { + this.socket.emit(event, ...data); + this.socket.broadcast.to(room).emit(event, ...data); + } + + async markMessageAsRead(data: unknown) { + try { + const user = this.user; + if (isGhost(user)) return; + + const messageId = wss.message.markMessageAsRead.parse(data).id; + const message = await messages.findById(messageId); + if (!message) throw new Error("Message not found"); + + const userId = user.id; + if (userId !== message.userId) throw new Error("Unauthorized"); + if (message.read) + return console.log("Message is already marked as read".yellow); + + await messages.markAsRead(messageId); + + this.socket.broadcast + .to(message.roomId.toString()) + .emit(Wss.ServerEvent.MessageRead, { messageId }); + } catch (error) { + console.log(error); + } + } + + async connect() { + const error = safe(async () => { + await this.online(); + await this.joinRooms(); + if (isAdmin(this.user)) this.emitServerStats(); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async disconnect() { + const error = safe(async () => { + await this.offline(); + await this.deregisterPeer(); + await this.removeUserFromCalls(); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async online() { + const user = this.user; + if (isGhost(user)) return; + const info = await users.update(user.id, { online: true }); + this.announceStatus(info); + } + + async offline() { + const user = this.user; + if (isGhost(user)) return; + const info = await users.update(user.id, { online: false }); + this.announceStatus(info); + } + + async emitServerStats() { + background.on("message", (message: PartentPortMessage) => { + if (message.type === PartentPortMessageType.Stats) + return this.socket.emit(Wss.ServerEvent.ServerStats, message.stats); + }); + } + + /** + * Remove ghost and tutor peer id from the cache. + * + * @note should be called when the socket disconnects from the server. + */ + async deregisterPeer() { + // todo: notify peers that the current user got disconnected + const user = this.user; + const display = isGhost(user) ? user : user.email; + stdout.info(`Deregister peer for: ${display}`); + + if (isGhost(user)) + return await cache.peer.removeGhostPeerId(getGhostCall(user)); + if (isTutor(user)) await cache.peer.removeUserPeerId(user.id); + } + + async removeUserFromCalls() { + const user = this.user; + if (isGhost(user)) return; + + const callId = await cache.call.removeMemberByUserId(user.id); + if (!callId) return; + + // notify members that a member has left the call + this.socket.broadcast + .to(this.asCallRoomId(callId)) + .emit(Wss.ServerEvent.MemberLeftCall, { + userId: user.id, + }); + } + + async announceStatus(user: IUser.Self) { + const userRooms = await rooms.findMemberFullRoomIds(user.id); + + for (const room of userRooms) { + this.broadcast(Wss.ServerEvent.UserStatusChanged, room.toString(), { + online: user.online, + }); + } + } + + asCallRoomId(callId: number) { + return `call:${callId}`; + } + + asChatRoomId(roomId: number) { + return `room:${roomId}`; + } +} + +export function wssHandler(socket: Socket) { + const user = socket.request.user; + if (!user) { + stdout.warning( + "(function) wssHandler: No user has been found in the request obj!" + ); + return; + } + return new WssHandler(socket, user); +} + From 8010a5f58561bed4ab2bf24f893123b5adc82041 Mon Sep 17 00:00:00 2001 From: "M. E. Abdelsalam" Date: Sun, 15 Dec 2024 22:01:34 +0200 Subject: [PATCH 2/3] chore: wss server handler has been refactored. --- .../src/wss/{handler.ts => handlers/base.ts} | 0 services/server/src/wss/handlers/call.ts | 15 +-- .../server/src/wss/handlers/connection.ts | 121 ++++++++++++++++++ services/server/src/wss/handlers/index.ts | 15 ++- .../server/src/wss/handlers/inputDevices.ts | 68 ++++++++++ services/server/src/wss/handlers/message.ts | 103 +++++++++++++++ services/server/src/wss/handlers/peer.ts | 56 ++++++++ services/server/src/wss/index.ts | 20 ++- services/server/src/wss/oldhandler.ts | 5 + services/server/src/wss/utils.ts | 7 + 10 files changed, 398 insertions(+), 12 deletions(-) rename services/server/src/wss/{handler.ts => handlers/base.ts} (100%) create mode 100644 services/server/src/wss/handlers/connection.ts create mode 100644 services/server/src/wss/handlers/inputDevices.ts create mode 100644 services/server/src/wss/handlers/message.ts create mode 100644 services/server/src/wss/handlers/peer.ts create mode 100644 services/server/src/wss/utils.ts diff --git a/services/server/src/wss/handler.ts b/services/server/src/wss/handlers/base.ts similarity index 100% rename from services/server/src/wss/handler.ts rename to services/server/src/wss/handlers/base.ts diff --git a/services/server/src/wss/handlers/call.ts b/services/server/src/wss/handlers/call.ts index ddf1e2d0..07335ae4 100644 --- a/services/server/src/wss/handlers/call.ts +++ b/services/server/src/wss/handlers/call.ts @@ -3,8 +3,10 @@ import { canJoinCall } from "@/lib/call"; import { isGhost } from "@litespace/auth"; import { logger, safe } from "@litespace/sol"; import { ICall, Wss } from "@litespace/types"; +import { WSSHandler } from "@/wss/handlers/base"; + import zod from "zod"; -import { WSSHandler } from "../handler"; +import { asCallRoomId } from "../utils"; const callTypes = ["lesson", "interview"] as const satisfies ICall.Type[]; const stdout = logger("wss"); @@ -25,6 +27,7 @@ export class CallHandler extends WSSHandler { const user = this.user; // todo: add ghost as a member of the call + if (!user) return stdout.error("user undefined!"); if (isGhost(user)) return stdout.warning("Unsupported"); stdout.info(`User ${user.id} is joining call ${callId}.`); @@ -39,7 +42,7 @@ export class CallHandler extends WSSHandler { // add user to the call by inserting row to call_members relation await cache.call.addMember({ userId: user.id, callId: callId }); - this.socket.join(this.asCallRoomId(callId)); + this.socket.join(asCallRoomId(callId)); stdout.info(`User ${user.id} has joined call ${callId}.`); @@ -51,7 +54,7 @@ export class CallHandler extends WSSHandler { // NOTE: the user notifies himself as well that he successfully joined the call. this.broadcast( Wss.ServerEvent.MemberJoinedCall, - this.asCallRoomId(callId), + asCallRoomId(callId), { userId: user.id } // TODO: define the payload struct type in the types package ); }); @@ -82,16 +85,12 @@ export class CallHandler extends WSSHandler { // notify members that a member has left the call this.socket.broadcast - .to(this.asCallRoomId(callId)) + .to(asCallRoomId(callId)) .emit(Wss.ServerEvent.MemberLeftCall, { userId: user.id, }); }); if (result instanceof Error) stdout.error(result.message); } - - private asCallRoomId(callId: number) { - return `call:${callId}`; - } } diff --git a/services/server/src/wss/handlers/connection.ts b/services/server/src/wss/handlers/connection.ts new file mode 100644 index 00000000..6a50904f --- /dev/null +++ b/services/server/src/wss/handlers/connection.ts @@ -0,0 +1,121 @@ +import { isAdmin, isGhost, isStudent, isTutor } from "@litespace/auth"; +import { dayjs, logger, safe } from "@litespace/sol"; +import { IUser, Wss } from "@litespace/types"; +import { WSSHandler } from "@/wss/handlers/base"; +import { calls, rooms, users } from "@litespace/models"; +import { background } from "@/workers"; +import { PartentPortMessage, PartentPortMessageType } from "@/workers/messages"; +import { asCallRoomId, asChatRoomId } from "@/wss/utils"; +import { cache } from "@/lib/cache"; +import { getGhostCall } from "@litespace/sol/ghost"; + +const stdout = logger("wss"); + +export class ConnectionHandler extends WSSHandler { + async connect() { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const info = await users.update(user.id, { online: true }); + this.announceStatus(info); + + await this.joinRooms(); + if (isAdmin(this.user)) this.emitServerStats(); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async disconnect() { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const info = await users.update(user.id, { online: false }); + this.announceStatus(info); + + await this.deregisterPeer(); + await this.removeUserFromCalls(); + }); + if (error instanceof Error) stdout.error(error.message); + } + + private async announceStatus(user: IUser.Self) { + const userRooms = await rooms.findMemberFullRoomIds(user.id); + + for (const room of userRooms) { + this.broadcast(Wss.ServerEvent.UserStatusChanged, room.toString(), { + online: user.online, + }); + } + } + + private async emitServerStats() { + background.on("message", (message: PartentPortMessage) => { + if (message.type === PartentPortMessageType.Stats) + return this.socket.emit(Wss.ServerEvent.ServerStats, message.stats); + }); + } + + private async joinRooms() { + const error = await safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { list } = await rooms.findMemberRooms({ userId: user.id }); + + this.socket.join(list.map((roomId) => asChatRoomId(roomId))); + // private channel + this.socket.join(user.id.toString()); + + if (isStudent(this.user)) this.socket.join(Wss.Room.TutorsCache); + if (isAdmin(this.user)) this.socket.join(Wss.Room.ServerStats); + + // todo: get user calls from cache + const callsList = await calls.find({ + users: [user.id], + full: true, + after: dayjs.utc().startOf("day").toISOString(), + before: dayjs.utc().add(1, "day").toISOString(), + }); + + this.socket.join( + callsList.list.map((call) => asCallRoomId(call.id)) + ); + }); + + if (error instanceof Error) stdout.error(error.message); + } + + /** + * Remove ghost and tutor peer id from the cache. + * + * @note should be called when the socket disconnects from the server. + */ + private async deregisterPeer() { + // todo: notify peers that the current user got disconnected + const user = this.user; + const display = isGhost(user) ? user : user.email; + stdout.info(`Deregister peer for: ${display}`); + + if (isGhost(user)) + return await cache.peer.removeGhostPeerId(getGhostCall(user)); + if (isTutor(user)) await cache.peer.removeUserPeerId(user.id); + } + + private async removeUserFromCalls() { + const user = this.user; + if (isGhost(user)) return; + + const callId = await cache.call.removeMemberByUserId(user.id); + if (!callId) return; + + // notify members that a member has left the call + this.socket.broadcast + .to(asCallRoomId(callId)) + .emit(Wss.ServerEvent.MemberLeftCall, { + userId: user.id, + }); + } +} + diff --git a/services/server/src/wss/handlers/index.ts b/services/server/src/wss/handlers/index.ts index fb04c4ec..14e474a4 100644 --- a/services/server/src/wss/handlers/index.ts +++ b/services/server/src/wss/handlers/index.ts @@ -1,11 +1,24 @@ import { Socket } from "socket.io"; -import { CallHandler } from "./call"; import { IUser } from "@litespace/types"; +import { CallHandler } from "./call"; +import { ConnectionHandler } from "./connection"; +import { MessageHandler } from "./message"; +import { PeerHandler } from "./peer"; +import { InputDevicesHandler } from "./inputDevices"; + export class WSSHandlers { + public readonly connection: ConnectionHandler; public readonly call: CallHandler; + public readonly message: MessageHandler; + public readonly peer: PeerHandler; + public readonly inputDevices: InputDevicesHandler; constructor(socket: Socket, user: IUser.Self | IUser.Ghost) { + this.connection = new ConnectionHandler(socket, user); this.call = new CallHandler(socket, user); + this.message = new MessageHandler(socket, user); + this.peer = new PeerHandler(socket, user); + this.inputDevices = new InputDevicesHandler(socket, user); } } diff --git a/services/server/src/wss/handlers/inputDevices.ts b/services/server/src/wss/handlers/inputDevices.ts new file mode 100644 index 00000000..aba00a7d --- /dev/null +++ b/services/server/src/wss/handlers/inputDevices.ts @@ -0,0 +1,68 @@ +import { logger, safe } from "@litespace/sol"; +import { rooms } from "@litespace/models"; +import { isGhost } from "@litespace/auth"; +import { Wss } from "@litespace/types"; +import { id, boolean } from "@/validation/utils"; +import { WSSHandler } from "./base"; + +import zod from "zod"; +import { isEmpty } from "lodash"; + +const toggleCameraPayload = zod.object({ call: id, camera: boolean }); +const toggleMicPayload = zod.object({ call: id, mic: boolean }); +const userTypingPayload = zod.object({ roomId: zod.number() }); + +const stdout = logger("wss"); + +export class InputDevicesHandler extends WSSHandler { + async toggleCamera(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + const { call, camera } = toggleCameraPayload.parse(data); + // todo: add validation + this.broadcast(Wss.ServerEvent.CameraToggled, call.toString(), { + user: user.id, + camera, + }); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async toggleMic(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + const { call, mic } = toggleMicPayload.parse(data); + // todo: add validation + this.broadcast(Wss.ServerEvent.MicToggled, call.toString(), { + user: user.id, + mic, + }); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async userTyping(data: unknown) { + const error = safe(async () => { + const { roomId } = userTypingPayload.parse(data); + + const user = this.user; + if (isGhost(user)) return; + + const members = await rooms.findRoomMembers({ roomIds: [roomId] }); + if (isEmpty(members)) return; + + const isMember = members.find((member) => member.id === user.id); + if (!isMember) + throw new Error(`User(${user.id}) isn't member of room Id: ${roomId}`); + + this.socket.to(roomId.toString()).emit(Wss.ServerEvent.UserTyping, { + roomId, + userId: user.id, + }); + }); + + if (error instanceof Error) stdout.error(error.message); + } +} diff --git a/services/server/src/wss/handlers/message.ts b/services/server/src/wss/handlers/message.ts new file mode 100644 index 00000000..8d8b912e --- /dev/null +++ b/services/server/src/wss/handlers/message.ts @@ -0,0 +1,103 @@ +import { isGhost } from "@litespace/auth"; +import { logger, safe, sanitizeMessage } from "@litespace/sol"; +import { Wss } from "@litespace/types"; +import { WSSHandler } from "@/wss/handlers/base"; +import { messages, rooms } from "@litespace/models"; +import { asChatRoomId } from "@/wss/utils"; +import wss from "@/validation/wss"; + +import zod from "zod"; +import { id, string, withNamedId } from "@/validation/utils"; + +const stdout = logger("wss"); + +const updateMessagePayload = zod.object({ text: string, id }); + +export class MessageHandler extends WSSHandler { + async sendMessage(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { roomId, text } = wss.message.send.parse(data); + const userId = user.id; + + stdout.log(`u:${userId} is send a message to r:${roomId}`); + + const members = await rooms.findRoomMembers({ roomIds: [roomId] }); + if (!members) throw Error("Room not found"); + + const member = members.map((member) => member.id).includes(userId); + if (!member) throw new Error("Unauthorized"); + + const sanitized = sanitizeMessage(text); + if (!sanitized) return; // empty message + const message = await messages.create({ + text: sanitized, + userId, + roomId, + }); + + this.broadcast( + Wss.ServerEvent.RoomMessage, + asChatRoomId(roomId), + message + ); + }); + if (error instanceof Error) stdout.error(error); + } + + async updateMessage(data: unknown) { + const error = await safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { id, text } = updateMessagePayload.parse(data); + const message = await messages.findById(id); + if (!message || message.deleted) throw new Error("Message not found"); + + const owner = message.userId === user.id; + if (!owner) throw new Error("Forbidden"); + + const sanitized = sanitizeMessage(text); + if (!sanitized) throw new Error("Invalid message"); + + const updated = await messages.update(id, { text: sanitized }); + if (!updated) throw new Error("Mesasge not update; should never happen."); + + this.broadcast( + Wss.ServerEvent.RoomMessageUpdated, + asChatRoomId(message.roomId), + updated + ); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async deleteMessage(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { id }: { id: number } = withNamedId("id").parse(data); + + const message = await messages.findById(id); + if (!message || message.deleted) throw new Error("Message not found"); + + const owner = message.userId === user.id; + if (!owner) throw new Error("Forbidden"); + + await messages.markAsDeleted(id); + + this.broadcast( + Wss.ServerEvent.RoomMessageDeleted, + asChatRoomId(message.roomId), + { + roomId: message.roomId, + messageId: message.id, + } + ); + }); + if (error instanceof Error) stdout.error(error.message); + } +} diff --git a/services/server/src/wss/handlers/peer.ts b/services/server/src/wss/handlers/peer.ts new file mode 100644 index 00000000..efaed222 --- /dev/null +++ b/services/server/src/wss/handlers/peer.ts @@ -0,0 +1,56 @@ +import { logger, safe } from "@litespace/sol"; +import { calls } from "@litespace/models"; +import { isGhost, isTutor, isUser } from "@litespace/auth"; +import { Wss } from "@litespace/types"; +import { getGhostCall } from "@litespace/sol/ghost"; +import { cache } from "@/lib/cache"; +import { id, string } from "@/validation/utils"; +import { WSSHandler } from "./base"; + +import zod from "zod"; +import { isEmpty } from "lodash"; + +const peerPayload = zod.object({ callId: id, peerId: string }); +const registerPeerPayload = zod.object({ peer: zod.string() }); + +const stdout = logger("wss"); + +export class PeerHandler extends WSSHandler { + async peerOpened(data: unknown) { + const error = await safe(async () => { + const { callId, peerId } = peerPayload.parse(data); + const user = this.user; + + const members = await calls.findCallMembers([callId]); + if (isEmpty(members)) return; + + const memberIds = members.map((member) => member.userId); + const isMember = isUser(user) && memberIds.includes(user.id); + const allowed = isMember || isGhost(this.user); + if (!allowed) return; + + this.socket.join(callId.toString()); + this.socket + .to(callId.toString()) + .emit(Wss.ServerEvent.UserJoinedCall, { peerId }); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async registerPeer(data: unknown) { + const result = await safe(async () => { + const { peer } = registerPeerPayload.parse(data); + const user = this.user; + const id = isGhost(user) ? user : user.email; + stdout.info(`Register peer: ${peer} for ${id}`); + + if (isGhost(user)) + await cache.peer.setGhostPeerId(getGhostCall(user), peer); + if (isTutor(user)) + await cache.peer.setUserPeerId(user.id, peer); + + // notify peers to refetch the peer id if needed + }); + if (result instanceof Error) stdout.error(result.message); + } +} diff --git a/services/server/src/wss/index.ts b/services/server/src/wss/index.ts index 189ba082..8d6f168d 100644 --- a/services/server/src/wss/index.ts +++ b/services/server/src/wss/index.ts @@ -3,7 +3,6 @@ import { WSSHandlers } from "./handlers"; import { logger } from "@litespace/sol"; import { Wss } from "@litespace/types"; - const stdout = logger("wss"); export function wssConnectionHandler(socket: Socket) { @@ -13,6 +12,21 @@ export function wssConnectionHandler(socket: Socket) { return; } const handlers = new WSSHandlers(socket, user); - socket.on(Wss.ClientEvent.JoinCall, handlers.call.onJoinCall); - socket.on(Wss.ClientEvent.LeaveCall, handlers.call.onLeaveCall); + + handlers.connection.connect(); + socket.on(Wss.ClientEvent.Disconnect, handlers.connection.disconnect.bind(handlers.connection)); + + socket.on(Wss.ClientEvent.JoinCall, handlers.call.onJoinCall.bind(handlers.call)); + socket.on(Wss.ClientEvent.LeaveCall, handlers.call.onLeaveCall.bind(handlers.call)); + + socket.on(Wss.ClientEvent.SendMessage, handlers.message.sendMessage.bind(handlers.message)); + socket.on(Wss.ClientEvent.UpdateMessage, handlers.message.updateMessage.bind(handlers.message)); + socket.on(Wss.ClientEvent.DeleteMessage, handlers.message.deleteMessage.bind(handlers.message)); + + socket.on(Wss.ClientEvent.PeerOpened, handlers.peer.peerOpened.bind(handlers.peer)); + socket.on(Wss.ClientEvent.RegisterPeer, handlers.peer.registerPeer.bind(handlers.peer)); + + socket.on(Wss.ClientEvent.ToggleCamera, handlers.inputDevices.toggleCamera.bind(handlers.inputDevices)); + socket.on(Wss.ClientEvent.ToggleMic, handlers.inputDevices.toggleMic.bind(handlers.inputDevices)); + socket.on(Wss.ClientEvent.UserTyping, handlers.inputDevices.userTyping.bind(handlers.inputDevices)); } diff --git a/services/server/src/wss/oldhandler.ts b/services/server/src/wss/oldhandler.ts index 0330a930..44be52fc 100644 --- a/services/server/src/wss/oldhandler.ts +++ b/services/server/src/wss/oldhandler.ts @@ -35,6 +35,9 @@ const onLeaveCallPayload = zod.object({ callId: zod.number() }); const stdout = logger("wss"); +/** + * @deprecated + */ export class WssHandler { socket: Socket; user: IUser.Self | IUser.Ghost; @@ -50,8 +53,10 @@ export class WssHandler { this.socket.on(Wss.ClientEvent.SendMessage, this.sendMessage.bind(this)); this.socket.on(Wss.ClientEvent.UpdateMessage, this.updateMessage.bind(this)); this.socket.on(Wss.ClientEvent.DeleteMessage, this.deleteMessage.bind(this)); + this.socket.on(Wss.ClientEvent.PeerOpened, this.peerOpened.bind(this)); this.socket.on(Wss.ClientEvent.RegisterPeer, this.registerPeer.bind(this)); + this.socket.on(Wss.ClientEvent.Disconnect, this.disconnect.bind(this)); this.socket.on(Wss.ClientEvent.ToggleCamera, this.toggleCamera.bind(this)); this.socket.on(Wss.ClientEvent.ToggleMic, this.toggleMic.bind(this)); diff --git a/services/server/src/wss/utils.ts b/services/server/src/wss/utils.ts new file mode 100644 index 00000000..86a11d9e --- /dev/null +++ b/services/server/src/wss/utils.ts @@ -0,0 +1,7 @@ +export function asCallRoomId(callId: number) { + return `call:${callId}`; +} + +export function asChatRoomId(roomId: number) { + return `room:${roomId}`; +} From 0d4d588850569cefe769fa7385ba04cfb623d8ab Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sun, 15 Dec 2024 23:14:48 +0200 Subject: [PATCH 3/3] refactor(server): refactor wss handler --- .DS_Store | Bin 0 -> 6148 bytes services/server/src/index.ts | 4 +- services/server/src/wss/handlers/base.ts | 2 +- services/server/src/wss/handlers/call.ts | 18 +++++-- .../server/src/wss/handlers/connection.ts | 15 +++--- services/server/src/wss/handlers/index.ts | 32 ++++++------ .../server/src/wss/handlers/inputDevices.ts | 42 +++++----------- services/server/src/wss/handlers/message.ts | 46 ++++++++++++++++-- services/server/src/wss/handlers/peer.ts | 11 +++-- services/server/src/wss/index.ts | 32 +++--------- 10 files changed, 109 insertions(+), 93 deletions(-) create mode 100644 .DS_Store diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..7c3f1724bf2f4412451ae1565a20434826bc8467 GIT binary patch literal 6148 zcmeHKI|>3Z5S>vG!N$@uSMUZw^aOhWLB&QC6s@=NTprCgpGH?ZZR8D1UNV`NkXP*N zh=|TFo0-T&L`HB!x!KS)+c)o6FCz+si0p zwti-h{ zq5pp-aYY5Fz+Wk#gGIBL<4IXtJCCzkTi`3W<=o+Bm^%f7mt&xpV=Sy3PdzE}ip{ZK V6Wc(iBkpt{e+En!8Ws4p0uPzK6_fw~ literal 0 HcmV?d00001 diff --git a/services/server/src/index.ts b/services/server/src/index.ts index 2f20fc94..42e75e93 100644 --- a/services/server/src/index.ts +++ b/services/server/src/index.ts @@ -4,6 +4,7 @@ import express, { json } from "express"; import routes from "@/routes"; import { ghostConfig, jwtSecret, serverConfig } from "@/constants"; import { errorHandler } from "@/middleware/error"; +import { wssHandler } from "@/wss"; import bodyParser from "body-parser"; import cors from "cors"; import logger from "morgan"; @@ -15,7 +16,6 @@ import { authMiddleware, adminOnly } from "@litespace/auth"; import { isAllowedOrigin } from "@/lib/cors"; import { cache } from "@/lib/cache"; import "colors"; -import { wssConnectionHandler } from "./wss"; // Stablish connection with the redis cache. cache.connect(); @@ -33,7 +33,7 @@ io.engine.use( ) ); io.engine.use(onlyForHandshake(authorizeSocket)); -io.on("connection", wssConnectionHandler); +io.on("connection", wssHandler); app.use( logger(function (tokens, req, res) { diff --git a/services/server/src/wss/handlers/base.ts b/services/server/src/wss/handlers/base.ts index 9b786a44..5647ab1b 100644 --- a/services/server/src/wss/handlers/base.ts +++ b/services/server/src/wss/handlers/base.ts @@ -1,7 +1,7 @@ import { IUser, Wss } from "@litespace/types"; import { Socket } from "socket.io"; -export abstract class WSSHandler { +export abstract class WssHandler { protected readonly socket: Socket; protected readonly user: IUser.Self | IUser.Ghost; diff --git a/services/server/src/wss/handlers/call.ts b/services/server/src/wss/handlers/call.ts index 07335ae4..cbb07ed2 100644 --- a/services/server/src/wss/handlers/call.ts +++ b/services/server/src/wss/handlers/call.ts @@ -3,18 +3,27 @@ import { canJoinCall } from "@/lib/call"; import { isGhost } from "@litespace/auth"; import { logger, safe } from "@litespace/sol"; import { ICall, Wss } from "@litespace/types"; -import { WSSHandler } from "@/wss/handlers/base"; +import { WssHandler } from "@/wss/handlers/base"; +import { asCallRoomId } from "@/wss/utils"; import zod from "zod"; -import { asCallRoomId } from "../utils"; const callTypes = ["lesson", "interview"] as const satisfies ICall.Type[]; const stdout = logger("wss"); -const onJoinCallPayload = zod.object({ callId: zod.number(), type: zod.enum(callTypes) }); +const onJoinCallPayload = zod.object({ + callId: zod.number(), + type: zod.enum(callTypes), +}); const onLeaveCallPayload = zod.object({ callId: zod.number() }); -export class CallHandler extends WSSHandler { +export class Call extends WssHandler { + public init(): Call { + this.socket.on(Wss.ClientEvent.JoinCall, this.onJoinCall.bind(this)); + this.socket.on(Wss.ClientEvent.LeaveCall, this.onLeaveCall.bind(this)); + return this; + } + /* * This event listener will be called whenever a user * joins a call. For instance, when @@ -93,4 +102,3 @@ export class CallHandler extends WSSHandler { if (result instanceof Error) stdout.error(result.message); } } - diff --git a/services/server/src/wss/handlers/connection.ts b/services/server/src/wss/handlers/connection.ts index 6a50904f..16c677d9 100644 --- a/services/server/src/wss/handlers/connection.ts +++ b/services/server/src/wss/handlers/connection.ts @@ -1,7 +1,7 @@ import { isAdmin, isGhost, isStudent, isTutor } from "@litespace/auth"; import { dayjs, logger, safe } from "@litespace/sol"; import { IUser, Wss } from "@litespace/types"; -import { WSSHandler } from "@/wss/handlers/base"; +import { WssHandler } from "@/wss/handlers/base"; import { calls, rooms, users } from "@litespace/models"; import { background } from "@/workers"; import { PartentPortMessage, PartentPortMessageType } from "@/workers/messages"; @@ -11,7 +11,13 @@ import { getGhostCall } from "@litespace/sol/ghost"; const stdout = logger("wss"); -export class ConnectionHandler extends WSSHandler { +export class Connection extends WssHandler { + public init(): Connection { + this.connect(); + this.socket.on(Wss.ClientEvent.Disconnect, this.disconnect.bind(this)); + return this; + } + async connect() { const error = safe(async () => { const user = this.user; @@ -79,9 +85,7 @@ export class ConnectionHandler extends WSSHandler { before: dayjs.utc().add(1, "day").toISOString(), }); - this.socket.join( - callsList.list.map((call) => asCallRoomId(call.id)) - ); + this.socket.join(callsList.list.map((call) => asCallRoomId(call.id))); }); if (error instanceof Error) stdout.error(error.message); @@ -118,4 +122,3 @@ export class ConnectionHandler extends WSSHandler { }); } } - diff --git a/services/server/src/wss/handlers/index.ts b/services/server/src/wss/handlers/index.ts index 14e474a4..e72b2c44 100644 --- a/services/server/src/wss/handlers/index.ts +++ b/services/server/src/wss/handlers/index.ts @@ -1,24 +1,24 @@ import { Socket } from "socket.io"; import { IUser } from "@litespace/types"; -import { CallHandler } from "./call"; -import { ConnectionHandler } from "./connection"; -import { MessageHandler } from "./message"; -import { PeerHandler } from "./peer"; -import { InputDevicesHandler } from "./inputDevices"; +import { Call } from "@/wss/handlers/call"; +import { Connection } from "@/wss/handlers/connection"; +import { Messages } from "@/wss/handlers/message"; +import { Peer } from "@/wss/handlers/peer"; +import { InputDevices } from "./inputDevices"; -export class WSSHandlers { - public readonly connection: ConnectionHandler; - public readonly call: CallHandler; - public readonly message: MessageHandler; - public readonly peer: PeerHandler; - public readonly inputDevices: InputDevicesHandler; +export class WssHandlers { + public readonly connection: Connection; + public readonly call: Call; + public readonly messages: Messages; + public readonly peer: Peer; + public readonly inputDevices: InputDevices; constructor(socket: Socket, user: IUser.Self | IUser.Ghost) { - this.connection = new ConnectionHandler(socket, user); - this.call = new CallHandler(socket, user); - this.message = new MessageHandler(socket, user); - this.peer = new PeerHandler(socket, user); - this.inputDevices = new InputDevicesHandler(socket, user); + this.connection = new Connection(socket, user).init(); + this.call = new Call(socket, user).init(); + this.messages = new Messages(socket, user).init(); + this.peer = new Peer(socket, user); + this.inputDevices = new InputDevices(socket, user).init(); } } diff --git a/services/server/src/wss/handlers/inputDevices.ts b/services/server/src/wss/handlers/inputDevices.ts index aba00a7d..c136b2a2 100644 --- a/services/server/src/wss/handlers/inputDevices.ts +++ b/services/server/src/wss/handlers/inputDevices.ts @@ -3,19 +3,26 @@ import { rooms } from "@litespace/models"; import { isGhost } from "@litespace/auth"; import { Wss } from "@litespace/types"; import { id, boolean } from "@/validation/utils"; -import { WSSHandler } from "./base"; - +import { WssHandler } from "@/wss/handlers/base"; import zod from "zod"; import { isEmpty } from "lodash"; const toggleCameraPayload = zod.object({ call: id, camera: boolean }); const toggleMicPayload = zod.object({ call: id, mic: boolean }); -const userTypingPayload = zod.object({ roomId: zod.number() }); const stdout = logger("wss"); -export class InputDevicesHandler extends WSSHandler { - async toggleCamera(data: unknown) { +export class InputDevices extends WssHandler { + public init(): InputDevices { + this.socket.on( + Wss.ClientEvent.ToggleCamera, + this.onToggleCamera.bind(this) + ); + this.socket.on(Wss.ClientEvent.ToggleMic, this.onToggleMic.bind(this)); + return this; + } + + async onToggleCamera(data: unknown) { const error = safe(async () => { const user = this.user; if (isGhost(user)) return; @@ -29,7 +36,7 @@ export class InputDevicesHandler extends WSSHandler { if (error instanceof Error) stdout.error(error.message); } - async toggleMic(data: unknown) { + async onToggleMic(data: unknown) { const error = safe(async () => { const user = this.user; if (isGhost(user)) return; @@ -42,27 +49,4 @@ export class InputDevicesHandler extends WSSHandler { }); if (error instanceof Error) stdout.error(error.message); } - - async userTyping(data: unknown) { - const error = safe(async () => { - const { roomId } = userTypingPayload.parse(data); - - const user = this.user; - if (isGhost(user)) return; - - const members = await rooms.findRoomMembers({ roomIds: [roomId] }); - if (isEmpty(members)) return; - - const isMember = members.find((member) => member.id === user.id); - if (!isMember) - throw new Error(`User(${user.id}) isn't member of room Id: ${roomId}`); - - this.socket.to(roomId.toString()).emit(Wss.ServerEvent.UserTyping, { - roomId, - userId: user.id, - }); - }); - - if (error instanceof Error) stdout.error(error.message); - } } diff --git a/services/server/src/wss/handlers/message.ts b/services/server/src/wss/handlers/message.ts index 8d8b912e..212369fd 100644 --- a/services/server/src/wss/handlers/message.ts +++ b/services/server/src/wss/handlers/message.ts @@ -1,19 +1,34 @@ import { isGhost } from "@litespace/auth"; import { logger, safe, sanitizeMessage } from "@litespace/sol"; import { Wss } from "@litespace/types"; -import { WSSHandler } from "@/wss/handlers/base"; +import { WssHandler } from "@/wss/handlers/base"; import { messages, rooms } from "@litespace/models"; import { asChatRoomId } from "@/wss/utils"; +import { id, string, withNamedId } from "@/validation/utils"; import wss from "@/validation/wss"; - import zod from "zod"; -import { id, string, withNamedId } from "@/validation/utils"; +import { isEmpty } from "lodash"; const stdout = logger("wss"); const updateMessagePayload = zod.object({ text: string, id }); +const userTypingPayload = zod.object({ roomId: zod.number() }); + +export class Messages extends WssHandler { + public init(): Messages { + this.socket.on(Wss.ClientEvent.SendMessage, this.sendMessage.bind(this)); + this.socket.on( + Wss.ClientEvent.UpdateMessage, + this.updateMessage.bind(this) + ); + this.socket.on( + Wss.ClientEvent.DeleteMessage, + this.deleteMessage.bind(this) + ); + this.socket.on(Wss.ClientEvent.UserTyping, this.onUserTyping.bind(this)); + return this; + } -export class MessageHandler extends WSSHandler { async sendMessage(data: unknown) { const error = safe(async () => { const user = this.user; @@ -100,4 +115,27 @@ export class MessageHandler extends WSSHandler { }); if (error instanceof Error) stdout.error(error.message); } + + async onUserTyping(data: unknown) { + const error = safe(async () => { + const { roomId } = userTypingPayload.parse(data); + + const user = this.user; + if (isGhost(user)) return; + + const members = await rooms.findRoomMembers({ roomIds: [roomId] }); + if (isEmpty(members)) return; + + const isMember = members.find((member) => member.id === user.id); + if (!isMember) + throw new Error(`User(${user.id}) isn't member of room Id: ${roomId}`); + + this.socket.to(roomId.toString()).emit(Wss.ServerEvent.UserTyping, { + roomId, + userId: user.id, + }); + }); + + if (error instanceof Error) stdout.error(error.message); + } } diff --git a/services/server/src/wss/handlers/peer.ts b/services/server/src/wss/handlers/peer.ts index efaed222..283d29c8 100644 --- a/services/server/src/wss/handlers/peer.ts +++ b/services/server/src/wss/handlers/peer.ts @@ -5,8 +5,7 @@ import { Wss } from "@litespace/types"; import { getGhostCall } from "@litespace/sol/ghost"; import { cache } from "@/lib/cache"; import { id, string } from "@/validation/utils"; -import { WSSHandler } from "./base"; - +import { WssHandler } from "@/wss/handlers/base"; import zod from "zod"; import { isEmpty } from "lodash"; @@ -15,7 +14,10 @@ const registerPeerPayload = zod.object({ peer: zod.string() }); const stdout = logger("wss"); -export class PeerHandler extends WSSHandler { +/** + * @deprecated should be removed in favor of the new call-cache-based design. + */ +export class Peer extends WssHandler { async peerOpened(data: unknown) { const error = await safe(async () => { const { callId, peerId } = peerPayload.parse(data); @@ -46,8 +48,7 @@ export class PeerHandler extends WSSHandler { if (isGhost(user)) await cache.peer.setGhostPeerId(getGhostCall(user), peer); - if (isTutor(user)) - await cache.peer.setUserPeerId(user.id, peer); + if (isTutor(user)) await cache.peer.setUserPeerId(user.id, peer); // notify peers to refetch the peer id if needed }); diff --git a/services/server/src/wss/index.ts b/services/server/src/wss/index.ts index 8d6f168d..442a0ccd 100644 --- a/services/server/src/wss/index.ts +++ b/services/server/src/wss/index.ts @@ -1,32 +1,14 @@ import { Socket } from "socket.io"; -import { WSSHandlers } from "./handlers"; +import { WssHandlers } from "@/wss/handlers"; import { logger } from "@litespace/sol"; -import { Wss } from "@litespace/types"; const stdout = logger("wss"); -export function wssConnectionHandler(socket: Socket) { +export function wssHandler(socket: Socket) { const user = socket.request.user; - if (!user) { - stdout.warning("(function) wssHandler: No user has been found in the request obj!"); - return; - } - const handlers = new WSSHandlers(socket, user); - - handlers.connection.connect(); - socket.on(Wss.ClientEvent.Disconnect, handlers.connection.disconnect.bind(handlers.connection)); - - socket.on(Wss.ClientEvent.JoinCall, handlers.call.onJoinCall.bind(handlers.call)); - socket.on(Wss.ClientEvent.LeaveCall, handlers.call.onLeaveCall.bind(handlers.call)); - - socket.on(Wss.ClientEvent.SendMessage, handlers.message.sendMessage.bind(handlers.message)); - socket.on(Wss.ClientEvent.UpdateMessage, handlers.message.updateMessage.bind(handlers.message)); - socket.on(Wss.ClientEvent.DeleteMessage, handlers.message.deleteMessage.bind(handlers.message)); - - socket.on(Wss.ClientEvent.PeerOpened, handlers.peer.peerOpened.bind(handlers.peer)); - socket.on(Wss.ClientEvent.RegisterPeer, handlers.peer.registerPeer.bind(handlers.peer)); - - socket.on(Wss.ClientEvent.ToggleCamera, handlers.inputDevices.toggleCamera.bind(handlers.inputDevices)); - socket.on(Wss.ClientEvent.ToggleMic, handlers.inputDevices.toggleMic.bind(handlers.inputDevices)); - socket.on(Wss.ClientEvent.UserTyping, handlers.inputDevices.userTyping.bind(handlers.inputDevices)); + if (!user) + return stdout.warning( + "wssHandler: No user has been found in the request obj!" + ); + return new WssHandlers(socket, user); }