Skip to content

Commit

Permalink
chore: wss server handler has been refactored.
Browse files Browse the repository at this point in the history
  • Loading branch information
mmoehabb committed Dec 15, 2024
1 parent 0c4120e commit 2bf3de9
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 12 deletions.
File renamed without changes.
15 changes: 7 additions & 8 deletions services/server/src/wss/handlers/call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ import { canJoinCall } from "@/lib/call";
import { isGhost } from "@litespace/auth";
import { logger, safe } from "@litespace/sol";
import { ICall, Wss } from "@litespace/types";
import { WSSHandler } from "@/wss/handlers/base";

import zod from "zod";
import { WSSHandler } from "../handler";
import { asCallRoomId } from "../utils";

const callTypes = ["lesson", "interview"] as const satisfies ICall.Type[];
const stdout = logger("wss");
Expand All @@ -25,6 +27,7 @@ export class CallHandler extends WSSHandler {

const user = this.user;
// todo: add ghost as a member of the call
if (!user) return stdout.error("user undefined!");
if (isGhost(user)) return stdout.warning("Unsupported");

stdout.info(`User ${user.id} is joining call ${callId}.`);
Expand All @@ -39,7 +42,7 @@ export class CallHandler extends WSSHandler {

// add user to the call by inserting row to call_members relation
await cache.call.addMember({ userId: user.id, callId: callId });
this.socket.join(this.asCallRoomId(callId));
this.socket.join(asCallRoomId(callId));

stdout.info(`User ${user.id} has joined call ${callId}.`);

Expand All @@ -51,7 +54,7 @@ export class CallHandler extends WSSHandler {
// NOTE: the user notifies himself as well that he successfully joined the call.
this.broadcast(
Wss.ServerEvent.MemberJoinedCall,
this.asCallRoomId(callId),
asCallRoomId(callId),
{ userId: user.id } // TODO: define the payload struct type in the types package
);
});
Expand Down Expand Up @@ -82,16 +85,12 @@ export class CallHandler extends WSSHandler {

// notify members that a member has left the call
this.socket.broadcast
.to(this.asCallRoomId(callId))
.to(asCallRoomId(callId))
.emit(Wss.ServerEvent.MemberLeftCall, {
userId: user.id,
});
});
if (result instanceof Error) stdout.error(result.message);
}

private asCallRoomId(callId: number) {
return `call:${callId}`;
}
}

121 changes: 121 additions & 0 deletions services/server/src/wss/handlers/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { isAdmin, isGhost, isStudent, isTutor } from "@litespace/auth";
import { dayjs, logger, safe } from "@litespace/sol";
import { IUser, Wss } from "@litespace/types";
import { WSSHandler } from "@/wss/handlers/base";
import { calls, rooms, users } from "@litespace/models";
import { background } from "@/workers";
import { PartentPortMessage, PartentPortMessageType } from "@/workers/messages";
import { asCallRoomId, asChatRoomId } from "@/wss/utils";
import { cache } from "@/lib/cache";
import { getGhostCall } from "@litespace/sol/ghost";

const stdout = logger("wss");

export class ConnectionHandler extends WSSHandler {
async connect() {
const error = safe(async () => {
const user = this.user;
if (isGhost(user)) return;

const info = await users.update(user.id, { online: true });
this.announceStatus(info);

await this.joinRooms();
if (isAdmin(this.user)) this.emitServerStats();
});
if (error instanceof Error) stdout.error(error.message);
}

async disconnect() {
const error = safe(async () => {
const user = this.user;
if (isGhost(user)) return;

const info = await users.update(user.id, { online: false });
this.announceStatus(info);

await this.deregisterPeer();
await this.removeUserFromCalls();
});
if (error instanceof Error) stdout.error(error.message);
}

private async announceStatus(user: IUser.Self) {
const userRooms = await rooms.findMemberFullRoomIds(user.id);

for (const room of userRooms) {
this.broadcast(Wss.ServerEvent.UserStatusChanged, room.toString(), {
online: user.online,
});
}
}

private async emitServerStats() {
background.on("message", (message: PartentPortMessage) => {
if (message.type === PartentPortMessageType.Stats)
return this.socket.emit(Wss.ServerEvent.ServerStats, message.stats);
});
}

private async joinRooms() {
const error = await safe(async () => {
const user = this.user;
if (isGhost(user)) return;

const { list } = await rooms.findMemberRooms({ userId: user.id });

this.socket.join(list.map((roomId) => asChatRoomId(roomId)));
// private channel
this.socket.join(user.id.toString());

if (isStudent(this.user)) this.socket.join(Wss.Room.TutorsCache);
if (isAdmin(this.user)) this.socket.join(Wss.Room.ServerStats);

// todo: get user calls from cache
const callsList = await calls.find({
users: [user.id],
full: true,
after: dayjs.utc().startOf("day").toISOString(),
before: dayjs.utc().add(1, "day").toISOString(),
});

this.socket.join(
callsList.list.map((call) => asCallRoomId(call.id))
);
});

if (error instanceof Error) stdout.error(error.message);
}

/**
* Remove ghost and tutor peer id from the cache.
*
* @note should be called when the socket disconnects from the server.
*/
private async deregisterPeer() {
// todo: notify peers that the current user got disconnected
const user = this.user;
const display = isGhost(user) ? user : user.email;
stdout.info(`Deregister peer for: ${display}`);

if (isGhost(user))
return await cache.peer.removeGhostPeerId(getGhostCall(user));
if (isTutor(user)) await cache.peer.removeUserPeerId(user.id);
}

private async removeUserFromCalls() {
const user = this.user;
if (isGhost(user)) return;

const callId = await cache.call.removeMemberByUserId(user.id);
if (!callId) return;

// notify members that a member has left the call
this.socket.broadcast
.to(asCallRoomId(callId))
.emit(Wss.ServerEvent.MemberLeftCall, {
userId: user.id,
});
}
}

15 changes: 14 additions & 1 deletion services/server/src/wss/handlers/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
import { Socket } from "socket.io";
import { CallHandler } from "./call";
import { IUser } from "@litespace/types";

import { CallHandler } from "./call";
import { ConnectionHandler } from "./connection";
import { MessageHandler } from "./message";
import { PeerHandler } from "./peer";
import { InputDevicesHandler } from "./inputDevices";

export class WSSHandlers {
public readonly connection: ConnectionHandler;
public readonly call: CallHandler;
public readonly message: MessageHandler;
public readonly peer: PeerHandler;
public readonly inputDevices: InputDevicesHandler;

constructor(socket: Socket, user: IUser.Self | IUser.Ghost) {
this.connection = new ConnectionHandler(socket, user);
this.call = new CallHandler(socket, user);
this.message = new MessageHandler(socket, user);
this.peer = new PeerHandler(socket, user);
this.inputDevices = new InputDevicesHandler(socket, user);
}
}
68 changes: 68 additions & 0 deletions services/server/src/wss/handlers/inputDevices.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { logger, safe } from "@litespace/sol";
import { rooms } from "@litespace/models";
import { isGhost } from "@litespace/auth";
import { Wss } from "@litespace/types";
import { id, boolean } from "@/validation/utils";
import { WSSHandler } from "./base";

import zod from "zod";
import { isEmpty } from "lodash";

const toggleCameraPayload = zod.object({ call: id, camera: boolean });
const toggleMicPayload = zod.object({ call: id, mic: boolean });
const userTypingPayload = zod.object({ roomId: zod.number() });

const stdout = logger("wss");

export class InputDevicesHandler extends WSSHandler {
async toggleCamera(data: unknown) {
const error = safe(async () => {
const user = this.user;
if (isGhost(user)) return;
const { call, camera } = toggleCameraPayload.parse(data);
// todo: add validation
this.broadcast(Wss.ServerEvent.CameraToggled, call.toString(), {
user: user.id,
camera,
});
});
if (error instanceof Error) stdout.error(error.message);
}

async toggleMic(data: unknown) {
const error = safe(async () => {
const user = this.user;
if (isGhost(user)) return;
const { call, mic } = toggleMicPayload.parse(data);
// todo: add validation
this.broadcast(Wss.ServerEvent.MicToggled, call.toString(), {
user: user.id,
mic,
});
});
if (error instanceof Error) stdout.error(error.message);
}

async userTyping(data: unknown) {
const error = safe(async () => {
const { roomId } = userTypingPayload.parse(data);

const user = this.user;
if (isGhost(user)) return;

const members = await rooms.findRoomMembers({ roomIds: [roomId] });
if (isEmpty(members)) return;

const isMember = members.find((member) => member.id === user.id);
if (!isMember)
throw new Error(`User(${user.id}) isn't member of room Id: ${roomId}`);

this.socket.to(roomId.toString()).emit(Wss.ServerEvent.UserTyping, {
roomId,
userId: user.id,
});
});

if (error instanceof Error) stdout.error(error.message);
}
}
103 changes: 103 additions & 0 deletions services/server/src/wss/handlers/message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { isGhost } from "@litespace/auth";
import { logger, safe, sanitizeMessage } from "@litespace/sol";
import { Wss } from "@litespace/types";
import { WSSHandler } from "@/wss/handlers/base";
import { messages, rooms } from "@litespace/models";
import { asChatRoomId } from "@/wss/utils";
import wss from "@/validation/wss";

import zod from "zod";
import { id, string, withNamedId } from "@/validation/utils";

const stdout = logger("wss");

const updateMessagePayload = zod.object({ text: string, id });

export class MessageHandler extends WSSHandler {
async sendMessage(data: unknown) {
const error = safe(async () => {
const user = this.user;
if (isGhost(user)) return;

const { roomId, text } = wss.message.send.parse(data);
const userId = user.id;

stdout.log(`u:${userId} is send a message to r:${roomId}`);

const members = await rooms.findRoomMembers({ roomIds: [roomId] });
if (!members) throw Error("Room not found");

const member = members.map((member) => member.id).includes(userId);
if (!member) throw new Error("Unauthorized");

const sanitized = sanitizeMessage(text);
if (!sanitized) return; // empty message
const message = await messages.create({
text: sanitized,
userId,
roomId,
});

this.broadcast(
Wss.ServerEvent.RoomMessage,
asChatRoomId(roomId),
message
);
});
if (error instanceof Error) stdout.error(error);
}

async updateMessage(data: unknown) {
const error = await safe(async () => {
const user = this.user;
if (isGhost(user)) return;

const { id, text } = updateMessagePayload.parse(data);
const message = await messages.findById(id);
if (!message || message.deleted) throw new Error("Message not found");

const owner = message.userId === user.id;
if (!owner) throw new Error("Forbidden");

const sanitized = sanitizeMessage(text);
if (!sanitized) throw new Error("Invalid message");

const updated = await messages.update(id, { text: sanitized });
if (!updated) throw new Error("Mesasge not update; should never happen.");

this.broadcast(
Wss.ServerEvent.RoomMessageUpdated,
asChatRoomId(message.roomId),
updated
);
});
if (error instanceof Error) stdout.error(error.message);
}

async deleteMessage(data: unknown) {
const error = safe(async () => {
const user = this.user;
if (isGhost(user)) return;

const { id }: { id: number } = withNamedId("id").parse(data);

const message = await messages.findById(id);
if (!message || message.deleted) throw new Error("Message not found");

const owner = message.userId === user.id;
if (!owner) throw new Error("Forbidden");

await messages.markAsDeleted(id);

this.broadcast(
Wss.ServerEvent.RoomMessageDeleted,
asChatRoomId(message.roomId),
{
roomId: message.roomId,
messageId: message.id,
}
);
});
if (error instanceof Error) stdout.error(error.message);
}
}
Loading

0 comments on commit 2bf3de9

Please sign in to comment.