Skip to content

Commit

Permalink
Merge pull request #212 from litespace-org/moehab/wss-handler-refacto…
Browse files Browse the repository at this point in the history
…ring

refactor(server): the wss server handler has been fully refactored.
  • Loading branch information
neuodev authored Dec 15, 2024
2 parents f6dc25b + 0d4d588 commit 21355f0
Show file tree
Hide file tree
Showing 11 changed files with 553 additions and 10 deletions.
Binary file added .DS_Store
Binary file not shown.
21 changes: 21 additions & 0 deletions services/server/src/wss/handlers/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { IUser, Wss } from "@litespace/types";
import { Socket } from "socket.io";

export abstract class WssHandler {
protected readonly socket: Socket;
protected readonly user: IUser.Self | IUser.Ghost;

constructor(socket: Socket, user: IUser.Self | IUser.Ghost) {
this.socket = socket;
this.user = user;
}

protected async broadcast<T extends keyof Wss.ServerEventsMap>(
event: T,
room: string,
...data: Parameters<Wss.ServerEventsMap[T]>
) {
this.socket.emit(event, ...data);
this.socket.broadcast.to(room).emit(event, ...data);
}
}
104 changes: 104 additions & 0 deletions services/server/src/wss/handlers/call.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { cache } from "@/lib/cache";
import { canJoinCall } from "@/lib/call";
import { isGhost } from "@litespace/auth";
import { logger, safe } from "@litespace/sol";
import { ICall, Wss } from "@litespace/types";
import { WssHandler } from "@/wss/handlers/base";
import { asCallRoomId } from "@/wss/utils";

import zod from "zod";

const callTypes = ["lesson", "interview"] as const satisfies ICall.Type[];
const stdout = logger("wss");

const onJoinCallPayload = zod.object({
callId: zod.number(),
type: zod.enum(callTypes),
});
const onLeaveCallPayload = zod.object({ callId: zod.number() });

export class Call extends WssHandler {
public init(): Call {
this.socket.on(Wss.ClientEvent.JoinCall, this.onJoinCall.bind(this));
this.socket.on(Wss.ClientEvent.LeaveCall, this.onLeaveCall.bind(this));
return this;
}

/*
* This event listener will be called whenever a user
* joins a call. For instance, when
* users are joining a lesson, or a tutor is joining an
* interview.
*/
async onJoinCall(data: unknown) {
const result = await safe(async () => {
const { callId, type } = onJoinCallPayload.parse(data);

const user = this.user;
// todo: add ghost as a member of the call
if (!user) return stdout.error("user undefined!");
if (isGhost(user)) return stdout.warning("Unsupported");

stdout.info(`User ${user.id} is joining call ${callId}.`);

const canJoin = await canJoinCall({
userId: user.id,
callType: type,
callId,
});

if (!canJoin) throw Error("Forbidden");

// add user to the call by inserting row to call_members relation
await cache.call.addMember({ userId: user.id, callId: callId });
this.socket.join(asCallRoomId(callId));

stdout.info(`User ${user.id} has joined call ${callId}.`);

// TODO: retrieve user data (ICall.PopulatedMember) from the database
// discuss with the team if shall we retrieve it from postgres,
// or store it in redis in the first place.

// notify members that a new member has joined the call
// NOTE: the user notifies himself as well that he successfully joined the call.
this.broadcast(
Wss.ServerEvent.MemberJoinedCall,
asCallRoomId(callId),
{ userId: user.id } // TODO: define the payload struct type in the types package
);
});
if (result instanceof Error) stdout.error(result.message);
}

/*
* This event listener will be called whenever a user
* leaves a call. For instance, when users are leaving
* a lesson, or a tutor is leaving an interview.
*/
async onLeaveCall(data: unknown) {
const result = await safe(async () => {
const { callId } = onLeaveCallPayload.parse(data);

const user = this.user;
if (isGhost(user)) return;

stdout.info(`User ${user.id} is leaving call ${callId}.`);

// remove user from the call by deleting the corresponding row from call_members relation
await cache.call.removeMember({
userId: user.id,
callId: callId,
});

stdout.info(`User ${user.id} has left call ${callId}.`);

// notify members that a member has left the call
this.socket.broadcast
.to(asCallRoomId(callId))
.emit(Wss.ServerEvent.MemberLeftCall, {
userId: user.id,
});
});
if (result instanceof Error) stdout.error(result.message);
}
}
124 changes: 124 additions & 0 deletions services/server/src/wss/handlers/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { isAdmin, isGhost, isStudent, isTutor } from "@litespace/auth";
import { dayjs, logger, safe } from "@litespace/sol";
import { IUser, Wss } from "@litespace/types";
import { WssHandler } from "@/wss/handlers/base";
import { calls, rooms, users } from "@litespace/models";
import { background } from "@/workers";
import { PartentPortMessage, PartentPortMessageType } from "@/workers/messages";
import { asCallRoomId, asChatRoomId } from "@/wss/utils";
import { cache } from "@/lib/cache";
import { getGhostCall } from "@litespace/sol/ghost";

const stdout = logger("wss");

export class Connection extends WssHandler {
public init(): Connection {
this.connect();
this.socket.on(Wss.ClientEvent.Disconnect, this.disconnect.bind(this));
return this;
}

async connect() {
const error = safe(async () => {
const user = this.user;
if (isGhost(user)) return;

const info = await users.update(user.id, { online: true });
this.announceStatus(info);

await this.joinRooms();
if (isAdmin(this.user)) this.emitServerStats();
});
if (error instanceof Error) stdout.error(error.message);
}

async disconnect() {
const error = safe(async () => {
const user = this.user;
if (isGhost(user)) return;

const info = await users.update(user.id, { online: false });
this.announceStatus(info);

await this.deregisterPeer();
await this.removeUserFromCalls();
});
if (error instanceof Error) stdout.error(error.message);
}

private async announceStatus(user: IUser.Self) {
const userRooms = await rooms.findMemberFullRoomIds(user.id);

for (const room of userRooms) {
this.broadcast(Wss.ServerEvent.UserStatusChanged, room.toString(), {
online: user.online,
});
}
}

private async emitServerStats() {
background.on("message", (message: PartentPortMessage) => {
if (message.type === PartentPortMessageType.Stats)
return this.socket.emit(Wss.ServerEvent.ServerStats, message.stats);
});
}

private async joinRooms() {
const error = await safe(async () => {
const user = this.user;
if (isGhost(user)) return;

const { list } = await rooms.findMemberRooms({ userId: user.id });

this.socket.join(list.map((roomId) => asChatRoomId(roomId)));
// private channel
this.socket.join(user.id.toString());

if (isStudent(this.user)) this.socket.join(Wss.Room.TutorsCache);
if (isAdmin(this.user)) this.socket.join(Wss.Room.ServerStats);

// todo: get user calls from cache
const callsList = await calls.find({
users: [user.id],
full: true,
after: dayjs.utc().startOf("day").toISOString(),
before: dayjs.utc().add(1, "day").toISOString(),
});

this.socket.join(callsList.list.map((call) => asCallRoomId(call.id)));
});

if (error instanceof Error) stdout.error(error.message);
}

/**
* Remove ghost and tutor peer id from the cache.
*
* @note should be called when the socket disconnects from the server.
*/
private async deregisterPeer() {
// todo: notify peers that the current user got disconnected
const user = this.user;
const display = isGhost(user) ? user : user.email;
stdout.info(`Deregister peer for: ${display}`);

if (isGhost(user))
return await cache.peer.removeGhostPeerId(getGhostCall(user));
if (isTutor(user)) await cache.peer.removeUserPeerId(user.id);
}

private async removeUserFromCalls() {
const user = this.user;
if (isGhost(user)) return;

const callId = await cache.call.removeMemberByUserId(user.id);
if (!callId) return;

// notify members that a member has left the call
this.socket.broadcast
.to(asCallRoomId(callId))
.emit(Wss.ServerEvent.MemberLeftCall, {
userId: user.id,
});
}
}
24 changes: 24 additions & 0 deletions services/server/src/wss/handlers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { Socket } from "socket.io";
import { IUser } from "@litespace/types";

import { Call } from "@/wss/handlers/call";
import { Connection } from "@/wss/handlers/connection";
import { Messages } from "@/wss/handlers/message";
import { Peer } from "@/wss/handlers/peer";
import { InputDevices } from "./inputDevices";

export class WssHandlers {
public readonly connection: Connection;
public readonly call: Call;
public readonly messages: Messages;
public readonly peer: Peer;
public readonly inputDevices: InputDevices;

constructor(socket: Socket, user: IUser.Self | IUser.Ghost) {
this.connection = new Connection(socket, user).init();
this.call = new Call(socket, user).init();
this.messages = new Messages(socket, user).init();
this.peer = new Peer(socket, user);
this.inputDevices = new InputDevices(socket, user).init();
}
}
52 changes: 52 additions & 0 deletions services/server/src/wss/handlers/inputDevices.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { logger, safe } from "@litespace/sol";
import { rooms } from "@litespace/models";
import { isGhost } from "@litespace/auth";
import { Wss } from "@litespace/types";
import { id, boolean } from "@/validation/utils";
import { WssHandler } from "@/wss/handlers/base";
import zod from "zod";
import { isEmpty } from "lodash";

const toggleCameraPayload = zod.object({ call: id, camera: boolean });
const toggleMicPayload = zod.object({ call: id, mic: boolean });

const stdout = logger("wss");

export class InputDevices extends WssHandler {
public init(): InputDevices {
this.socket.on(
Wss.ClientEvent.ToggleCamera,
this.onToggleCamera.bind(this)
);
this.socket.on(Wss.ClientEvent.ToggleMic, this.onToggleMic.bind(this));
return this;
}

async onToggleCamera(data: unknown) {
const error = safe(async () => {
const user = this.user;
if (isGhost(user)) return;
const { call, camera } = toggleCameraPayload.parse(data);
// todo: add validation
this.broadcast(Wss.ServerEvent.CameraToggled, call.toString(), {
user: user.id,
camera,
});
});
if (error instanceof Error) stdout.error(error.message);
}

async onToggleMic(data: unknown) {
const error = safe(async () => {
const user = this.user;
if (isGhost(user)) return;
const { call, mic } = toggleMicPayload.parse(data);
// todo: add validation
this.broadcast(Wss.ServerEvent.MicToggled, call.toString(), {
user: user.id,
mic,
});
});
if (error instanceof Error) stdout.error(error.message);
}
}
Loading

0 comments on commit 21355f0

Please sign in to comment.