diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..7c3f1724 Binary files /dev/null and b/.DS_Store differ diff --git a/services/server/src/wss/handlers/base.ts b/services/server/src/wss/handlers/base.ts new file mode 100644 index 00000000..5647ab1b --- /dev/null +++ b/services/server/src/wss/handlers/base.ts @@ -0,0 +1,21 @@ +import { IUser, Wss } from "@litespace/types"; +import { Socket } from "socket.io"; + +export abstract class WssHandler { + protected readonly socket: Socket; + protected readonly user: IUser.Self | IUser.Ghost; + + constructor(socket: Socket, user: IUser.Self | IUser.Ghost) { + this.socket = socket; + this.user = user; + } + + protected async broadcast( + event: T, + room: string, + ...data: Parameters + ) { + this.socket.emit(event, ...data); + this.socket.broadcast.to(room).emit(event, ...data); + } +} diff --git a/services/server/src/wss/handlers/call.ts b/services/server/src/wss/handlers/call.ts new file mode 100644 index 00000000..cbb07ed2 --- /dev/null +++ b/services/server/src/wss/handlers/call.ts @@ -0,0 +1,104 @@ +import { cache } from "@/lib/cache"; +import { canJoinCall } from "@/lib/call"; +import { isGhost } from "@litespace/auth"; +import { logger, safe } from "@litespace/sol"; +import { ICall, Wss } from "@litespace/types"; +import { WssHandler } from "@/wss/handlers/base"; +import { asCallRoomId } from "@/wss/utils"; + +import zod from "zod"; + +const callTypes = ["lesson", "interview"] as const satisfies ICall.Type[]; +const stdout = logger("wss"); + +const onJoinCallPayload = zod.object({ + callId: zod.number(), + type: zod.enum(callTypes), +}); +const onLeaveCallPayload = zod.object({ callId: zod.number() }); + +export class Call extends WssHandler { + public init(): Call { + this.socket.on(Wss.ClientEvent.JoinCall, this.onJoinCall.bind(this)); + this.socket.on(Wss.ClientEvent.LeaveCall, this.onLeaveCall.bind(this)); + return this; + } + + /* + * This event listener will be called whenever a user + * joins a call. For instance, when + * users are joining a lesson, or a tutor is joining an + * interview. + */ + async onJoinCall(data: unknown) { + const result = await safe(async () => { + const { callId, type } = onJoinCallPayload.parse(data); + + const user = this.user; + // todo: add ghost as a member of the call + if (!user) return stdout.error("user undefined!"); + if (isGhost(user)) return stdout.warning("Unsupported"); + + stdout.info(`User ${user.id} is joining call ${callId}.`); + + const canJoin = await canJoinCall({ + userId: user.id, + callType: type, + callId, + }); + + if (!canJoin) throw Error("Forbidden"); + + // add user to the call by inserting row to call_members relation + await cache.call.addMember({ userId: user.id, callId: callId }); + this.socket.join(asCallRoomId(callId)); + + stdout.info(`User ${user.id} has joined call ${callId}.`); + + // TODO: retrieve user data (ICall.PopulatedMember) from the database + // discuss with the team if shall we retrieve it from postgres, + // or store it in redis in the first place. + + // notify members that a new member has joined the call + // NOTE: the user notifies himself as well that he successfully joined the call. + this.broadcast( + Wss.ServerEvent.MemberJoinedCall, + asCallRoomId(callId), + { userId: user.id } // TODO: define the payload struct type in the types package + ); + }); + if (result instanceof Error) stdout.error(result.message); + } + + /* + * This event listener will be called whenever a user + * leaves a call. For instance, when users are leaving + * a lesson, or a tutor is leaving an interview. + */ + async onLeaveCall(data: unknown) { + const result = await safe(async () => { + const { callId } = onLeaveCallPayload.parse(data); + + const user = this.user; + if (isGhost(user)) return; + + stdout.info(`User ${user.id} is leaving call ${callId}.`); + + // remove user from the call by deleting the corresponding row from call_members relation + await cache.call.removeMember({ + userId: user.id, + callId: callId, + }); + + stdout.info(`User ${user.id} has left call ${callId}.`); + + // notify members that a member has left the call + this.socket.broadcast + .to(asCallRoomId(callId)) + .emit(Wss.ServerEvent.MemberLeftCall, { + userId: user.id, + }); + }); + if (result instanceof Error) stdout.error(result.message); + } +} diff --git a/services/server/src/wss/handlers/connection.ts b/services/server/src/wss/handlers/connection.ts new file mode 100644 index 00000000..16c677d9 --- /dev/null +++ b/services/server/src/wss/handlers/connection.ts @@ -0,0 +1,124 @@ +import { isAdmin, isGhost, isStudent, isTutor } from "@litespace/auth"; +import { dayjs, logger, safe } from "@litespace/sol"; +import { IUser, Wss } from "@litespace/types"; +import { WssHandler } from "@/wss/handlers/base"; +import { calls, rooms, users } from "@litespace/models"; +import { background } from "@/workers"; +import { PartentPortMessage, PartentPortMessageType } from "@/workers/messages"; +import { asCallRoomId, asChatRoomId } from "@/wss/utils"; +import { cache } from "@/lib/cache"; +import { getGhostCall } from "@litespace/sol/ghost"; + +const stdout = logger("wss"); + +export class Connection extends WssHandler { + public init(): Connection { + this.connect(); + this.socket.on(Wss.ClientEvent.Disconnect, this.disconnect.bind(this)); + return this; + } + + async connect() { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const info = await users.update(user.id, { online: true }); + this.announceStatus(info); + + await this.joinRooms(); + if (isAdmin(this.user)) this.emitServerStats(); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async disconnect() { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const info = await users.update(user.id, { online: false }); + this.announceStatus(info); + + await this.deregisterPeer(); + await this.removeUserFromCalls(); + }); + if (error instanceof Error) stdout.error(error.message); + } + + private async announceStatus(user: IUser.Self) { + const userRooms = await rooms.findMemberFullRoomIds(user.id); + + for (const room of userRooms) { + this.broadcast(Wss.ServerEvent.UserStatusChanged, room.toString(), { + online: user.online, + }); + } + } + + private async emitServerStats() { + background.on("message", (message: PartentPortMessage) => { + if (message.type === PartentPortMessageType.Stats) + return this.socket.emit(Wss.ServerEvent.ServerStats, message.stats); + }); + } + + private async joinRooms() { + const error = await safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { list } = await rooms.findMemberRooms({ userId: user.id }); + + this.socket.join(list.map((roomId) => asChatRoomId(roomId))); + // private channel + this.socket.join(user.id.toString()); + + if (isStudent(this.user)) this.socket.join(Wss.Room.TutorsCache); + if (isAdmin(this.user)) this.socket.join(Wss.Room.ServerStats); + + // todo: get user calls from cache + const callsList = await calls.find({ + users: [user.id], + full: true, + after: dayjs.utc().startOf("day").toISOString(), + before: dayjs.utc().add(1, "day").toISOString(), + }); + + this.socket.join(callsList.list.map((call) => asCallRoomId(call.id))); + }); + + if (error instanceof Error) stdout.error(error.message); + } + + /** + * Remove ghost and tutor peer id from the cache. + * + * @note should be called when the socket disconnects from the server. + */ + private async deregisterPeer() { + // todo: notify peers that the current user got disconnected + const user = this.user; + const display = isGhost(user) ? user : user.email; + stdout.info(`Deregister peer for: ${display}`); + + if (isGhost(user)) + return await cache.peer.removeGhostPeerId(getGhostCall(user)); + if (isTutor(user)) await cache.peer.removeUserPeerId(user.id); + } + + private async removeUserFromCalls() { + const user = this.user; + if (isGhost(user)) return; + + const callId = await cache.call.removeMemberByUserId(user.id); + if (!callId) return; + + // notify members that a member has left the call + this.socket.broadcast + .to(asCallRoomId(callId)) + .emit(Wss.ServerEvent.MemberLeftCall, { + userId: user.id, + }); + } +} diff --git a/services/server/src/wss/handlers/index.ts b/services/server/src/wss/handlers/index.ts new file mode 100644 index 00000000..e72b2c44 --- /dev/null +++ b/services/server/src/wss/handlers/index.ts @@ -0,0 +1,24 @@ +import { Socket } from "socket.io"; +import { IUser } from "@litespace/types"; + +import { Call } from "@/wss/handlers/call"; +import { Connection } from "@/wss/handlers/connection"; +import { Messages } from "@/wss/handlers/message"; +import { Peer } from "@/wss/handlers/peer"; +import { InputDevices } from "./inputDevices"; + +export class WssHandlers { + public readonly connection: Connection; + public readonly call: Call; + public readonly messages: Messages; + public readonly peer: Peer; + public readonly inputDevices: InputDevices; + + constructor(socket: Socket, user: IUser.Self | IUser.Ghost) { + this.connection = new Connection(socket, user).init(); + this.call = new Call(socket, user).init(); + this.messages = new Messages(socket, user).init(); + this.peer = new Peer(socket, user); + this.inputDevices = new InputDevices(socket, user).init(); + } +} diff --git a/services/server/src/wss/handlers/inputDevices.ts b/services/server/src/wss/handlers/inputDevices.ts new file mode 100644 index 00000000..c136b2a2 --- /dev/null +++ b/services/server/src/wss/handlers/inputDevices.ts @@ -0,0 +1,52 @@ +import { logger, safe } from "@litespace/sol"; +import { rooms } from "@litespace/models"; +import { isGhost } from "@litespace/auth"; +import { Wss } from "@litespace/types"; +import { id, boolean } from "@/validation/utils"; +import { WssHandler } from "@/wss/handlers/base"; +import zod from "zod"; +import { isEmpty } from "lodash"; + +const toggleCameraPayload = zod.object({ call: id, camera: boolean }); +const toggleMicPayload = zod.object({ call: id, mic: boolean }); + +const stdout = logger("wss"); + +export class InputDevices extends WssHandler { + public init(): InputDevices { + this.socket.on( + Wss.ClientEvent.ToggleCamera, + this.onToggleCamera.bind(this) + ); + this.socket.on(Wss.ClientEvent.ToggleMic, this.onToggleMic.bind(this)); + return this; + } + + async onToggleCamera(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + const { call, camera } = toggleCameraPayload.parse(data); + // todo: add validation + this.broadcast(Wss.ServerEvent.CameraToggled, call.toString(), { + user: user.id, + camera, + }); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async onToggleMic(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + const { call, mic } = toggleMicPayload.parse(data); + // todo: add validation + this.broadcast(Wss.ServerEvent.MicToggled, call.toString(), { + user: user.id, + mic, + }); + }); + if (error instanceof Error) stdout.error(error.message); + } +} diff --git a/services/server/src/wss/handlers/message.ts b/services/server/src/wss/handlers/message.ts new file mode 100644 index 00000000..212369fd --- /dev/null +++ b/services/server/src/wss/handlers/message.ts @@ -0,0 +1,141 @@ +import { isGhost } from "@litespace/auth"; +import { logger, safe, sanitizeMessage } from "@litespace/sol"; +import { Wss } from "@litespace/types"; +import { WssHandler } from "@/wss/handlers/base"; +import { messages, rooms } from "@litespace/models"; +import { asChatRoomId } from "@/wss/utils"; +import { id, string, withNamedId } from "@/validation/utils"; +import wss from "@/validation/wss"; +import zod from "zod"; +import { isEmpty } from "lodash"; + +const stdout = logger("wss"); + +const updateMessagePayload = zod.object({ text: string, id }); +const userTypingPayload = zod.object({ roomId: zod.number() }); + +export class Messages extends WssHandler { + public init(): Messages { + this.socket.on(Wss.ClientEvent.SendMessage, this.sendMessage.bind(this)); + this.socket.on( + Wss.ClientEvent.UpdateMessage, + this.updateMessage.bind(this) + ); + this.socket.on( + Wss.ClientEvent.DeleteMessage, + this.deleteMessage.bind(this) + ); + this.socket.on(Wss.ClientEvent.UserTyping, this.onUserTyping.bind(this)); + return this; + } + + async sendMessage(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { roomId, text } = wss.message.send.parse(data); + const userId = user.id; + + stdout.log(`u:${userId} is send a message to r:${roomId}`); + + const members = await rooms.findRoomMembers({ roomIds: [roomId] }); + if (!members) throw Error("Room not found"); + + const member = members.map((member) => member.id).includes(userId); + if (!member) throw new Error("Unauthorized"); + + const sanitized = sanitizeMessage(text); + if (!sanitized) return; // empty message + const message = await messages.create({ + text: sanitized, + userId, + roomId, + }); + + this.broadcast( + Wss.ServerEvent.RoomMessage, + asChatRoomId(roomId), + message + ); + }); + if (error instanceof Error) stdout.error(error); + } + + async updateMessage(data: unknown) { + const error = await safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { id, text } = updateMessagePayload.parse(data); + const message = await messages.findById(id); + if (!message || message.deleted) throw new Error("Message not found"); + + const owner = message.userId === user.id; + if (!owner) throw new Error("Forbidden"); + + const sanitized = sanitizeMessage(text); + if (!sanitized) throw new Error("Invalid message"); + + const updated = await messages.update(id, { text: sanitized }); + if (!updated) throw new Error("Mesasge not update; should never happen."); + + this.broadcast( + Wss.ServerEvent.RoomMessageUpdated, + asChatRoomId(message.roomId), + updated + ); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async deleteMessage(data: unknown) { + const error = safe(async () => { + const user = this.user; + if (isGhost(user)) return; + + const { id }: { id: number } = withNamedId("id").parse(data); + + const message = await messages.findById(id); + if (!message || message.deleted) throw new Error("Message not found"); + + const owner = message.userId === user.id; + if (!owner) throw new Error("Forbidden"); + + await messages.markAsDeleted(id); + + this.broadcast( + Wss.ServerEvent.RoomMessageDeleted, + asChatRoomId(message.roomId), + { + roomId: message.roomId, + messageId: message.id, + } + ); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async onUserTyping(data: unknown) { + const error = safe(async () => { + const { roomId } = userTypingPayload.parse(data); + + const user = this.user; + if (isGhost(user)) return; + + const members = await rooms.findRoomMembers({ roomIds: [roomId] }); + if (isEmpty(members)) return; + + const isMember = members.find((member) => member.id === user.id); + if (!isMember) + throw new Error(`User(${user.id}) isn't member of room Id: ${roomId}`); + + this.socket.to(roomId.toString()).emit(Wss.ServerEvent.UserTyping, { + roomId, + userId: user.id, + }); + }); + + if (error instanceof Error) stdout.error(error.message); + } +} diff --git a/services/server/src/wss/handlers/peer.ts b/services/server/src/wss/handlers/peer.ts new file mode 100644 index 00000000..283d29c8 --- /dev/null +++ b/services/server/src/wss/handlers/peer.ts @@ -0,0 +1,57 @@ +import { logger, safe } from "@litespace/sol"; +import { calls } from "@litespace/models"; +import { isGhost, isTutor, isUser } from "@litespace/auth"; +import { Wss } from "@litespace/types"; +import { getGhostCall } from "@litespace/sol/ghost"; +import { cache } from "@/lib/cache"; +import { id, string } from "@/validation/utils"; +import { WssHandler } from "@/wss/handlers/base"; +import zod from "zod"; +import { isEmpty } from "lodash"; + +const peerPayload = zod.object({ callId: id, peerId: string }); +const registerPeerPayload = zod.object({ peer: zod.string() }); + +const stdout = logger("wss"); + +/** + * @deprecated should be removed in favor of the new call-cache-based design. + */ +export class Peer extends WssHandler { + async peerOpened(data: unknown) { + const error = await safe(async () => { + const { callId, peerId } = peerPayload.parse(data); + const user = this.user; + + const members = await calls.findCallMembers([callId]); + if (isEmpty(members)) return; + + const memberIds = members.map((member) => member.userId); + const isMember = isUser(user) && memberIds.includes(user.id); + const allowed = isMember || isGhost(this.user); + if (!allowed) return; + + this.socket.join(callId.toString()); + this.socket + .to(callId.toString()) + .emit(Wss.ServerEvent.UserJoinedCall, { peerId }); + }); + if (error instanceof Error) stdout.error(error.message); + } + + async registerPeer(data: unknown) { + const result = await safe(async () => { + const { peer } = registerPeerPayload.parse(data); + const user = this.user; + const id = isGhost(user) ? user : user.email; + stdout.info(`Register peer: ${peer} for ${id}`); + + if (isGhost(user)) + await cache.peer.setGhostPeerId(getGhostCall(user), peer); + if (isTutor(user)) await cache.peer.setUserPeerId(user.id, peer); + + // notify peers to refetch the peer id if needed + }); + if (result instanceof Error) stdout.error(result.message); + } +} diff --git a/services/server/src/wss/index.ts b/services/server/src/wss/index.ts index c9b8acfb..442a0ccd 100644 --- a/services/server/src/wss/index.ts +++ b/services/server/src/wss/index.ts @@ -1 +1,14 @@ -export { wssHandler } from "@/wss/handler"; +import { Socket } from "socket.io"; +import { WssHandlers } from "@/wss/handlers"; +import { logger } from "@litespace/sol"; + +const stdout = logger("wss"); + +export function wssHandler(socket: Socket) { + const user = socket.request.user; + if (!user) + return stdout.warning( + "wssHandler: No user has been found in the request obj!" + ); + return new WssHandlers(socket, user); +} diff --git a/services/server/src/wss/handler.ts b/services/server/src/wss/oldhandler.ts similarity index 98% rename from services/server/src/wss/handler.ts rename to services/server/src/wss/oldhandler.ts index 8c178e42..44be52fc 100644 --- a/services/server/src/wss/handler.ts +++ b/services/server/src/wss/oldhandler.ts @@ -35,6 +35,9 @@ const onLeaveCallPayload = zod.object({ callId: zod.number() }); const stdout = logger("wss"); +/** + * @deprecated + */ export class WssHandler { socket: Socket; user: IUser.Self | IUser.Ghost; @@ -46,18 +49,14 @@ export class WssHandler { } async initialize() { - this.connect(); + this.connect(); this.socket.on(Wss.ClientEvent.SendMessage, this.sendMessage.bind(this)); - this.socket.on( - Wss.ClientEvent.UpdateMessage, - this.updateMessage.bind(this) - ); - this.socket.on( - Wss.ClientEvent.DeleteMessage, - this.deleteMessage.bind(this) - ); + this.socket.on(Wss.ClientEvent.UpdateMessage, this.updateMessage.bind(this)); + this.socket.on(Wss.ClientEvent.DeleteMessage, this.deleteMessage.bind(this)); + this.socket.on(Wss.ClientEvent.PeerOpened, this.peerOpened.bind(this)); this.socket.on(Wss.ClientEvent.RegisterPeer, this.registerPeer.bind(this)); + this.socket.on(Wss.ClientEvent.Disconnect, this.disconnect.bind(this)); this.socket.on(Wss.ClientEvent.ToggleCamera, this.toggleCamera.bind(this)); this.socket.on(Wss.ClientEvent.ToggleMic, this.toggleMic.bind(this)); @@ -488,3 +487,4 @@ export function wssHandler(socket: Socket) { } return new WssHandler(socket, user); } + diff --git a/services/server/src/wss/utils.ts b/services/server/src/wss/utils.ts new file mode 100644 index 00000000..86a11d9e --- /dev/null +++ b/services/server/src/wss/utils.ts @@ -0,0 +1,7 @@ +export function asCallRoomId(callId: number) { + return `call:${callId}`; +} + +export function asChatRoomId(roomId: number) { + return `room:${roomId}`; +}