diff --git a/services/server/src/wss/handler.ts b/services/server/src/wss/handlers/base.ts similarity index 100% rename from services/server/src/wss/handler.ts rename to services/server/src/wss/handlers/base.ts diff --git a/services/server/src/wss/handlers/call.ts b/services/server/src/wss/handlers/call.ts index ddf1e2d0b..07335ae43 100644 --- a/services/server/src/wss/handlers/call.ts +++ b/services/server/src/wss/handlers/call.ts @@ -3,8 +3,10 @@ import { canJoinCall } from "@/lib/call"; import { isGhost } from "@litespace/auth"; import { logger, safe } from "@litespace/sol"; import { ICall, Wss } from "@litespace/types"; +import { WSSHandler } from "@/wss/handlers/base"; + import zod from "zod"; -import { WSSHandler } from "../handler"; +import { asCallRoomId } from "../utils"; const callTypes = ["lesson", "interview"] as const satisfies ICall.Type[]; const stdout = logger("wss"); @@ -25,6 +27,7 @@ export class CallHandler extends WSSHandler { const user = this.user; // todo: add ghost as a member of the call + if (!user) return stdout.error("user undefined!"); if (isGhost(user)) return stdout.warning("Unsupported"); stdout.info(`User ${user.id} is joining call ${callId}.`); @@ -39,7 +42,7 @@ export class CallHandler extends WSSHandler { // add user to the call by inserting row to call_members relation await cache.call.addMember({ userId: user.id, callId: callId }); - this.socket.join(this.asCallRoomId(callId)); + this.socket.join(asCallRoomId(callId)); stdout.info(`User ${user.id} has joined call ${callId}.`); @@ -51,7 +54,7 @@ export class CallHandler extends WSSHandler { // NOTE: the user notifies himself as well that he successfully joined the call. this.broadcast( Wss.ServerEvent.MemberJoinedCall, - this.asCallRoomId(callId), + asCallRoomId(callId), { userId: user.id } // TODO: define the payload struct type in the types package ); }); @@ -82,16 +85,12 @@ export class CallHandler extends WSSHandler { // notify members that a member has left the call this.socket.broadcast - .to(this.asCallRoomId(callId)) + .to(asCallRoomId(callId)) .emit(Wss.ServerEvent.MemberLeftCall, { userId: user.id, }); }); if (result instanceof Error) stdout.error(result.message); } - - private asCallRoomId(callId: number) { - return `call:${callId}`; - } } diff --git a/services/server/src/wss/handlers/connection.ts b/services/server/src/wss/handlers/connection.ts new file mode 100644 index 000000000..6a50904f1 --- /dev/null +++ b/services/server/src/wss/handlers/connection.ts @@ -0,0 +1,121 @@ +import { isAdmin, isGhost, isStudent, isTutor } from "@litespace/auth"; +import { dayjs, logger, safe } from "@litespace/sol"; +import { IUser, Wss } from "@litespace/types"; +import { WSSHandler } from "@/wss/handlers/base"; +import { calls, rooms, users } from "@litespace/models"; +import { background } from "@/workers"; +import { PartentPortMessage, PartentPortMessageType } from "@/workers/messages"; +import { asCallRoomId, asChatRoomId } from "@/wss/utils"; +import { cache } from "@/lib/cache"; +import { getGhostCall } from "@litespace/sol/ghost"; + +const stdout = logger("wss"); + +export class ConnectionHandler extends WSSHandler { + async connect() { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const info = await users.update(user.id, { online: true }); + this.announceStatus(info); + + await this.joinRooms(); + if (isAdmin(this.user)) this.emitServerStats(); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async disconnect() { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const info = await users.update(user.id, { online: false }); + this.announceStatus(info); + + await this.deregisterPeer(); + await this.removeUserFromCalls(); + }); + if (error instanceof Error) stdout.error(error.message); + } + + private async announceStatus(user: IUser.Self) { + const userRooms = await rooms.findMemberFullRoomIds(user.id); + + for (const room of userRooms) { + this.broadcast(Wss.ServerEvent.UserStatusChanged, room.toString(), { + online: user.online, + }); + } + } + + private async emitServerStats() { + background.on("message", (message: PartentPortMessage) => { + if (message.type === PartentPortMessageType.Stats) + return this.socket.emit(Wss.ServerEvent.ServerStats, message.stats); + }); + } + + private async joinRooms() { + const error = await safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { list } = await rooms.findMemberRooms({ userId: user.id }); + + this.socket.join(list.map((roomId) => asChatRoomId(roomId))); + // private channel + this.socket.join(user.id.toString()); + + if (isStudent(this.user)) this.socket.join(Wss.Room.TutorsCache); + if (isAdmin(this.user)) this.socket.join(Wss.Room.ServerStats); + + // todo: get user calls from cache + const callsList = await calls.find({ + users: [user.id], + full: true, + after: dayjs.utc().startOf("day").toISOString(), + before: dayjs.utc().add(1, "day").toISOString(), + }); + + this.socket.join( + callsList.list.map((call) => asCallRoomId(call.id)) + ); + }); + + if (error instanceof Error) stdout.error(error.message); + } + + /** + * Remove ghost and tutor peer id from the cache. + * + * @note should be called when the socket disconnects from the server. + */ + private async deregisterPeer() { + // todo: notify peers that the current user got disconnected + const user = this.user; + const display = isGhost(user) ? user : user.email; + stdout.info(`Deregister peer for: ${display}`); + + if (isGhost(user)) + return await cache.peer.removeGhostPeerId(getGhostCall(user)); + if (isTutor(user)) await cache.peer.removeUserPeerId(user.id); + } + + private async removeUserFromCalls() { + const user = this.user; + if (isGhost(user)) return; + + const callId = await cache.call.removeMemberByUserId(user.id); + if (!callId) return; + + // notify members that a member has left the call + this.socket.broadcast + .to(asCallRoomId(callId)) + .emit(Wss.ServerEvent.MemberLeftCall, { + userId: user.id, + }); + } +} + diff --git a/services/server/src/wss/handlers/index.ts b/services/server/src/wss/handlers/index.ts index fb04c4ec6..14e474a44 100644 --- a/services/server/src/wss/handlers/index.ts +++ b/services/server/src/wss/handlers/index.ts @@ -1,11 +1,24 @@ import { Socket } from "socket.io"; -import { CallHandler } from "./call"; import { IUser } from "@litespace/types"; +import { CallHandler } from "./call"; +import { ConnectionHandler } from "./connection"; +import { MessageHandler } from "./message"; +import { PeerHandler } from "./peer"; +import { InputDevicesHandler } from "./inputDevices"; + export class WSSHandlers { + public readonly connection: ConnectionHandler; public readonly call: CallHandler; + public readonly message: MessageHandler; + public readonly peer: PeerHandler; + public readonly inputDevices: InputDevicesHandler; constructor(socket: Socket, user: IUser.Self | IUser.Ghost) { + this.connection = new ConnectionHandler(socket, user); this.call = new CallHandler(socket, user); + this.message = new MessageHandler(socket, user); + this.peer = new PeerHandler(socket, user); + this.inputDevices = new InputDevicesHandler(socket, user); } } diff --git a/services/server/src/wss/handlers/inputDevices.ts b/services/server/src/wss/handlers/inputDevices.ts new file mode 100644 index 000000000..aba00a7dd --- /dev/null +++ b/services/server/src/wss/handlers/inputDevices.ts @@ -0,0 +1,68 @@ +import { logger, safe } from "@litespace/sol"; +import { rooms } from "@litespace/models"; +import { isGhost } from "@litespace/auth"; +import { Wss } from "@litespace/types"; +import { id, boolean } from "@/validation/utils"; +import { WSSHandler } from "./base"; + +import zod from "zod"; +import { isEmpty } from "lodash"; + +const toggleCameraPayload = zod.object({ call: id, camera: boolean }); +const toggleMicPayload = zod.object({ call: id, mic: boolean }); +const userTypingPayload = zod.object({ roomId: zod.number() }); + +const stdout = logger("wss"); + +export class InputDevicesHandler extends WSSHandler { + async toggleCamera(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + const { call, camera } = toggleCameraPayload.parse(data); + // todo: add validation + this.broadcast(Wss.ServerEvent.CameraToggled, call.toString(), { + user: user.id, + camera, + }); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async toggleMic(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + const { call, mic } = toggleMicPayload.parse(data); + // todo: add validation + this.broadcast(Wss.ServerEvent.MicToggled, call.toString(), { + user: user.id, + mic, + }); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async userTyping(data: unknown) { + const error = safe(async () => { + const { roomId } = userTypingPayload.parse(data); + + const user = this.user; + if (isGhost(user)) return; + + const members = await rooms.findRoomMembers({ roomIds: [roomId] }); + if (isEmpty(members)) return; + + const isMember = members.find((member) => member.id === user.id); + if (!isMember) + throw new Error(`User(${user.id}) isn't member of room Id: ${roomId}`); + + this.socket.to(roomId.toString()).emit(Wss.ServerEvent.UserTyping, { + roomId, + userId: user.id, + }); + }); + + if (error instanceof Error) stdout.error(error.message); + } +} diff --git a/services/server/src/wss/handlers/message.ts b/services/server/src/wss/handlers/message.ts new file mode 100644 index 000000000..8d8b912e4 --- /dev/null +++ b/services/server/src/wss/handlers/message.ts @@ -0,0 +1,103 @@ +import { isGhost } from "@litespace/auth"; +import { logger, safe, sanitizeMessage } from "@litespace/sol"; +import { Wss } from "@litespace/types"; +import { WSSHandler } from "@/wss/handlers/base"; +import { messages, rooms } from "@litespace/models"; +import { asChatRoomId } from "@/wss/utils"; +import wss from "@/validation/wss"; + +import zod from "zod"; +import { id, string, withNamedId } from "@/validation/utils"; + +const stdout = logger("wss"); + +const updateMessagePayload = zod.object({ text: string, id }); + +export class MessageHandler extends WSSHandler { + async sendMessage(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { roomId, text } = wss.message.send.parse(data); + const userId = user.id; + + stdout.log(`u:${userId} is send a message to r:${roomId}`); + + const members = await rooms.findRoomMembers({ roomIds: [roomId] }); + if (!members) throw Error("Room not found"); + + const member = members.map((member) => member.id).includes(userId); + if (!member) throw new Error("Unauthorized"); + + const sanitized = sanitizeMessage(text); + if (!sanitized) return; // empty message + const message = await messages.create({ + text: sanitized, + userId, + roomId, + }); + + this.broadcast( + Wss.ServerEvent.RoomMessage, + asChatRoomId(roomId), + message + ); + }); + if (error instanceof Error) stdout.error(error); + } + + async updateMessage(data: unknown) { + const error = await safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { id, text } = updateMessagePayload.parse(data); + const message = await messages.findById(id); + if (!message || message.deleted) throw new Error("Message not found"); + + const owner = message.userId === user.id; + if (!owner) throw new Error("Forbidden"); + + const sanitized = sanitizeMessage(text); + if (!sanitized) throw new Error("Invalid message"); + + const updated = await messages.update(id, { text: sanitized }); + if (!updated) throw new Error("Mesasge not update; should never happen."); + + this.broadcast( + Wss.ServerEvent.RoomMessageUpdated, + asChatRoomId(message.roomId), + updated + ); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async deleteMessage(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { id }: { id: number } = withNamedId("id").parse(data); + + const message = await messages.findById(id); + if (!message || message.deleted) throw new Error("Message not found"); + + const owner = message.userId === user.id; + if (!owner) throw new Error("Forbidden"); + + await messages.markAsDeleted(id); + + this.broadcast( + Wss.ServerEvent.RoomMessageDeleted, + asChatRoomId(message.roomId), + { + roomId: message.roomId, + messageId: message.id, + } + ); + }); + if (error instanceof Error) stdout.error(error.message); + } +} diff --git a/services/server/src/wss/handlers/peer.ts b/services/server/src/wss/handlers/peer.ts new file mode 100644 index 000000000..efaed2220 --- /dev/null +++ b/services/server/src/wss/handlers/peer.ts @@ -0,0 +1,56 @@ +import { logger, safe } from "@litespace/sol"; +import { calls } from "@litespace/models"; +import { isGhost, isTutor, isUser } from "@litespace/auth"; +import { Wss } from "@litespace/types"; +import { getGhostCall } from "@litespace/sol/ghost"; +import { cache } from "@/lib/cache"; +import { id, string } from "@/validation/utils"; +import { WSSHandler } from "./base"; + +import zod from "zod"; +import { isEmpty } from "lodash"; + +const peerPayload = zod.object({ callId: id, peerId: string }); +const registerPeerPayload = zod.object({ peer: zod.string() }); + +const stdout = logger("wss"); + +export class PeerHandler extends WSSHandler { + async peerOpened(data: unknown) { + const error = await safe(async () => { + const { callId, peerId } = peerPayload.parse(data); + const user = this.user; + + const members = await calls.findCallMembers([callId]); + if (isEmpty(members)) return; + + const memberIds = members.map((member) => member.userId); + const isMember = isUser(user) && memberIds.includes(user.id); + const allowed = isMember || isGhost(this.user); + if (!allowed) return; + + this.socket.join(callId.toString()); + this.socket + .to(callId.toString()) + .emit(Wss.ServerEvent.UserJoinedCall, { peerId }); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async registerPeer(data: unknown) { + const result = await safe(async () => { + const { peer } = registerPeerPayload.parse(data); + const user = this.user; + const id = isGhost(user) ? user : user.email; + stdout.info(`Register peer: ${peer} for ${id}`); + + if (isGhost(user)) + await cache.peer.setGhostPeerId(getGhostCall(user), peer); + if (isTutor(user)) + await cache.peer.setUserPeerId(user.id, peer); + + // notify peers to refetch the peer id if needed + }); + if (result instanceof Error) stdout.error(result.message); + } +} diff --git a/services/server/src/wss/index.ts b/services/server/src/wss/index.ts index 189ba0821..8d6f168d0 100644 --- a/services/server/src/wss/index.ts +++ b/services/server/src/wss/index.ts @@ -3,7 +3,6 @@ import { WSSHandlers } from "./handlers"; import { logger } from "@litespace/sol"; import { Wss } from "@litespace/types"; - const stdout = logger("wss"); export function wssConnectionHandler(socket: Socket) { @@ -13,6 +12,21 @@ export function wssConnectionHandler(socket: Socket) { return; } const handlers = new WSSHandlers(socket, user); - socket.on(Wss.ClientEvent.JoinCall, handlers.call.onJoinCall); - socket.on(Wss.ClientEvent.LeaveCall, handlers.call.onLeaveCall); + + handlers.connection.connect(); + socket.on(Wss.ClientEvent.Disconnect, handlers.connection.disconnect.bind(handlers.connection)); + + socket.on(Wss.ClientEvent.JoinCall, handlers.call.onJoinCall.bind(handlers.call)); + socket.on(Wss.ClientEvent.LeaveCall, handlers.call.onLeaveCall.bind(handlers.call)); + + socket.on(Wss.ClientEvent.SendMessage, handlers.message.sendMessage.bind(handlers.message)); + socket.on(Wss.ClientEvent.UpdateMessage, handlers.message.updateMessage.bind(handlers.message)); + socket.on(Wss.ClientEvent.DeleteMessage, handlers.message.deleteMessage.bind(handlers.message)); + + socket.on(Wss.ClientEvent.PeerOpened, handlers.peer.peerOpened.bind(handlers.peer)); + socket.on(Wss.ClientEvent.RegisterPeer, handlers.peer.registerPeer.bind(handlers.peer)); + + socket.on(Wss.ClientEvent.ToggleCamera, handlers.inputDevices.toggleCamera.bind(handlers.inputDevices)); + socket.on(Wss.ClientEvent.ToggleMic, handlers.inputDevices.toggleMic.bind(handlers.inputDevices)); + socket.on(Wss.ClientEvent.UserTyping, handlers.inputDevices.userTyping.bind(handlers.inputDevices)); } diff --git a/services/server/src/wss/oldhandler.ts b/services/server/src/wss/oldhandler.ts index 0330a9305..44be52fcf 100644 --- a/services/server/src/wss/oldhandler.ts +++ b/services/server/src/wss/oldhandler.ts @@ -35,6 +35,9 @@ const onLeaveCallPayload = zod.object({ callId: zod.number() }); const stdout = logger("wss"); +/** + * @deprecated + */ export class WssHandler { socket: Socket; user: IUser.Self | IUser.Ghost; @@ -50,8 +53,10 @@ export class WssHandler { this.socket.on(Wss.ClientEvent.SendMessage, this.sendMessage.bind(this)); this.socket.on(Wss.ClientEvent.UpdateMessage, this.updateMessage.bind(this)); this.socket.on(Wss.ClientEvent.DeleteMessage, this.deleteMessage.bind(this)); + this.socket.on(Wss.ClientEvent.PeerOpened, this.peerOpened.bind(this)); this.socket.on(Wss.ClientEvent.RegisterPeer, this.registerPeer.bind(this)); + this.socket.on(Wss.ClientEvent.Disconnect, this.disconnect.bind(this)); this.socket.on(Wss.ClientEvent.ToggleCamera, this.toggleCamera.bind(this)); this.socket.on(Wss.ClientEvent.ToggleMic, this.toggleMic.bind(this)); diff --git a/services/server/src/wss/utils.ts b/services/server/src/wss/utils.ts new file mode 100644 index 000000000..86a11d9e2 --- /dev/null +++ b/services/server/src/wss/utils.ts @@ -0,0 +1,7 @@ +export function asCallRoomId(callId: number) { + return `call:${callId}`; +} + +export function asChatRoomId(roomId: number) { + return `room:${roomId}`; +}